2020
2121import com .clearspring .analytics .stream .cardinality .HyperLogLog ;
2222import com .google .common .base .Preconditions ;
23+ import java .util .BitSet ;
2324import java .util .List ;
2425import java .util .Map ;
2526import javax .annotation .Nullable ;
3738import org .apache .pinot .segment .spi .index .reader .Dictionary ;
3839import org .apache .pinot .spi .data .FieldSpec .DataType ;
3940import org .apache .pinot .spi .utils .CommonConstants ;
40- import org .roaringbitmap .PeekableIntIterator ;
41- import org .roaringbitmap .RoaringBitmap ;
4241
4342
4443public class DistinctCountHLLAggregationFunction extends BaseSingleInputAggregationFunction <HyperLogLog , Long > {
45- // When the dictionary size exceeds this threshold, dictionary IDs are offered directly to HyperLogLog
46- // rather than being collected in a RoaringBitmap for deduplication first. For high-cardinality columns,
47- // this avoids the O(n log n) cost of bitmap insertions and provides significant speedup.
48- //
49- // 100K is chosen as the crossover point where direct-HLL becomes faster than bitmap dedup:
50- // - Below 100K: RoaringBitmap is compact (~12KB), insertions are cheap, and pre-deduplication
51- // marginally improves HLL accuracy by reducing duplicate offers before finalization.
52- // - Above 100K: bitmap memory and insertion cost dominate; HLL's ~0.8% error (log2m=12)
53- // makes exact pre-deduplication negligible for correctness anyway.
54- // This default matches DISTINCT_COUNT_SMART_HLL's dictThreshold default (see #17411).
55- public static final int DEFAULT_DICT_SIZE_THRESHOLD = 100_000 ;
56-
5744 protected final int _log2m ;
58- protected final int _dictSizeThreshold ;
5945
6046 public DistinctCountHLLAggregationFunction (List <ExpressionContext > arguments ) {
6147 super (arguments .get (0 ));
6248 int numExpressions = arguments .size ();
63- // This function expects 1, 2, or 3 arguments.
64- Preconditions .checkArgument (numExpressions <= 3 , "DistinctCountHLL expects 1, 2, or 3 arguments, got: %s" ,
49+ // This function expects 1 or 2 arguments.
50+ Preconditions .checkArgument (numExpressions <= 2 , "DistinctCountHLL expects 1 or 2 arguments, got: %s" ,
6551 numExpressions );
66- if (numExpressions > = 2 ) {
52+ if (numExpressions = = 2 ) {
6753 _log2m = arguments .get (1 ).getLiteral ().getIntValue ();
6854 } else {
6955 _log2m = CommonConstants .Helix .DEFAULT_HYPERLOGLOG_LOG2M ;
7056 }
71- if (numExpressions >= 3 ) {
72- int dictSizeThreshold = arguments .get (2 ).getLiteral ().getIntValue ();
73- _dictSizeThreshold = dictSizeThreshold > 0 ? dictSizeThreshold : Integer .MAX_VALUE ;
74- } else {
75- _dictSizeThreshold = DEFAULT_DICT_SIZE_THRESHOLD ;
76- }
7757 }
7858
7959 public int getLog2m () {
8060 return _log2m ;
8161 }
8262
83- public int getDictSizeThreshold () {
84- return _dictSizeThreshold ;
85- }
86-
8763 @ Override
8864 public AggregationFunctionType getType () {
8965 return AggregationFunctionType .DISTINCTCOUNTHLL ;
@@ -136,21 +112,13 @@ public void aggregate(int length, AggregationResultHolder aggregationResultHolde
136112
137113 protected void aggregateSV (int length , AggregationResultHolder aggregationResultHolder , BlockValSet blockValSet ,
138114 DataType storedType ) {
139- // For dictionary-encoded expression, use adaptive strategy based on dictionary size
115+ // For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
116+ // BitSet gives O(1) insertion with no container-switching overhead (unlike RoaringBitmap), and uses
117+ // dictSize/8 bytes of memory (e.g. 128 KB for a 1M-entry dictionary).
140118 Dictionary dictionary = blockValSet .getDictionary ();
141119 if (dictionary != null ) {
142120 int [] dictIds = blockValSet .getDictionaryIdsSV ();
143- if (dictionary .length () > _dictSizeThreshold ) {
144- // High-cardinality dictionary: bypass RoaringBitmap and offer values directly to HLL.
145- // Avoids O(n log n) bitmap insertion cost at the expense of approximate deduplication,
146- // which is acceptable since DISTINCTCOUNTHLL already returns an approximate result.
147- HyperLogLog hyperLogLog = getHyperLogLog (aggregationResultHolder );
148- for (int i = 0 ; i < length ; i ++) {
149- hyperLogLog .offer (dictionary .get (dictIds [i ]));
150- }
151- } else {
152- getDictIdBitmap (aggregationResultHolder , dictionary ).addN (dictIds , 0 , length );
153- }
121+ getDictIdBitSet (aggregationResultHolder , dictionary ).addDictIds (dictIds , length );
154122 return ;
155123 }
156124
@@ -194,22 +162,13 @@ protected void aggregateSV(int length, AggregationResultHolder aggregationResult
194162
195163 protected void aggregateMV (int length , AggregationResultHolder aggregationResultHolder , BlockValSet blockValSet ,
196164 DataType storedType ) {
197- // For dictionary-encoded expression, use adaptive strategy based on dictionary size
165+ // For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
198166 Dictionary dictionary = blockValSet .getDictionary ();
199167 if (dictionary != null ) {
200168 int [][] dictIds = blockValSet .getDictionaryIdsMV ();
201- if (dictionary .length () > _dictSizeThreshold ) {
202- HyperLogLog hyperLogLog = getHyperLogLog (aggregationResultHolder );
203- for (int i = 0 ; i < length ; i ++) {
204- for (int dictId : dictIds [i ]) {
205- hyperLogLog .offer (dictionary .get (dictId ));
206- }
207- }
208- } else {
209- RoaringBitmap dictIdBitmap = getDictIdBitmap (aggregationResultHolder , dictionary );
210- for (int i = 0 ; i < length ; i ++) {
211- dictIdBitmap .add (dictIds [i ]);
212- }
169+ DictIdsWrapper dictIdsWrapper = getDictIdBitSet (aggregationResultHolder , dictionary );
170+ for (int i = 0 ; i < length ; i ++) {
171+ dictIdsWrapper .addDictIds (dictIds [i ]);
213172 }
214173 return ;
215174 }
@@ -297,18 +256,12 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
297256
298257 protected void aggregateSVGroupBySV (int length , int [] groupKeyArray , GroupByResultHolder groupByResultHolder ,
299258 BlockValSet blockValSet , DataType storedType ) {
300- // For dictionary-encoded expression, use adaptive strategy based on dictionary size
259+ // For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
301260 Dictionary dictionary = blockValSet .getDictionary ();
302261 if (dictionary != null ) {
303262 int [] dictIds = blockValSet .getDictionaryIdsSV ();
304- if (dictionary .length () > _dictSizeThreshold ) {
305- for (int i = 0 ; i < length ; i ++) {
306- getHyperLogLog (groupByResultHolder , groupKeyArray [i ]).offer (dictionary .get (dictIds [i ]));
307- }
308- } else {
309- for (int i = 0 ; i < length ; i ++) {
310- getDictIdBitmap (groupByResultHolder , groupKeyArray [i ], dictionary ).add (dictIds [i ]);
311- }
263+ for (int i = 0 ; i < length ; i ++) {
264+ getDictIdBitSet (groupByResultHolder , groupKeyArray [i ], dictionary ).set (dictIds [i ]);
312265 }
313266 return ;
314267 }
@@ -352,21 +305,13 @@ protected void aggregateSVGroupBySV(int length, int[] groupKeyArray, GroupByResu
352305
353306 protected void aggregateMVGroupBySV (int length , int [] groupKeyArray , GroupByResultHolder groupByResultHolder ,
354307 BlockValSet blockValSet , DataType storedType ) {
355- // For dictionary-encoded expression, use adaptive strategy based on dictionary size
308+ // For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
356309 Dictionary dictionary = blockValSet .getDictionary ();
357310 if (dictionary != null ) {
358311 int [][] dictIds = blockValSet .getDictionaryIdsMV ();
359- if (dictionary .length () > _dictSizeThreshold ) {
360- for (int i = 0 ; i < length ; i ++) {
361- HyperLogLog hyperLogLog = getHyperLogLog (groupByResultHolder , groupKeyArray [i ]);
362- for (int dictId : dictIds [i ]) {
363- hyperLogLog .offer (dictionary .get (dictId ));
364- }
365- }
366- } else {
367- for (int i = 0 ; i < length ; i ++) {
368- getDictIdBitmap (groupByResultHolder , groupKeyArray [i ], dictionary ).add (dictIds [i ]);
369- }
312+ for (int i = 0 ; i < length ; i ++) {
313+ DictIdsWrapper dictIdsWrapper = getDictIdBitSet (groupByResultHolder , groupKeyArray [i ], dictionary );
314+ dictIdsWrapper .addDictIds (dictIds [i ]);
370315 }
371316 return ;
372317 }
@@ -461,20 +406,14 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
461406
462407 protected void aggregateSVGroupByMV (int length , int [][] groupKeysArray , GroupByResultHolder groupByResultHolder ,
463408 BlockValSet blockValSet , DataType storedType ) {
464- // For dictionary-encoded expression, use adaptive strategy based on dictionary size
409+ // For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
465410 Dictionary dictionary = blockValSet .getDictionary ();
466411 if (dictionary != null ) {
467412 int [] dictIds = blockValSet .getDictionaryIdsSV ();
468- if (dictionary .length () > _dictSizeThreshold ) {
469- for (int i = 0 ; i < length ; i ++) {
470- Object value = dictionary .get (dictIds [i ]);
471- for (int groupKey : groupKeysArray [i ]) {
472- getHyperLogLog (groupByResultHolder , groupKey ).offer (value );
473- }
474- }
475- } else {
476- for (int i = 0 ; i < length ; i ++) {
477- setDictIdForGroupKeys (groupByResultHolder , groupKeysArray [i ], dictionary , dictIds [i ]);
413+ for (int i = 0 ; i < length ; i ++) {
414+ int dictId = dictIds [i ];
415+ for (int groupKey : groupKeysArray [i ]) {
416+ getDictIdBitSet (groupByResultHolder , groupKey , dictionary ).set (dictId );
478417 }
479418 }
480419 return ;
@@ -519,25 +458,14 @@ protected void aggregateSVGroupByMV(int length, int[][] groupKeysArray, GroupByR
519458
520459 protected void aggregateMVGroupByMV (int length , int [][] groupKeysArray , GroupByResultHolder groupByResultHolder ,
521460 BlockValSet blockValSet , DataType storedType ) {
522- // For dictionary-encoded expression, use adaptive strategy based on dictionary size
461+ // For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
523462 Dictionary dictionary = blockValSet .getDictionary ();
524463 if (dictionary != null ) {
525464 int [][] dictIds = blockValSet .getDictionaryIdsMV ();
526- if (dictionary .length () > _dictSizeThreshold ) {
527- for (int i = 0 ; i < length ; i ++) {
528- int [] rowDictIds = dictIds [i ];
529- for (int groupKey : groupKeysArray [i ]) {
530- HyperLogLog hyperLogLog = getHyperLogLog (groupByResultHolder , groupKey );
531- for (int dictId : rowDictIds ) {
532- hyperLogLog .offer (dictionary .get (dictId ));
533- }
534- }
535- }
536- } else {
537- for (int i = 0 ; i < length ; i ++) {
538- for (int groupKey : groupKeysArray [i ]) {
539- getDictIdBitmap (groupByResultHolder , groupKey , dictionary ).add (dictIds [i ]);
540- }
465+ for (int i = 0 ; i < length ; i ++) {
466+ int [] rowDictIds = dictIds [i ];
467+ for (int groupKey : groupKeysArray [i ]) {
468+ getDictIdBitSet (groupByResultHolder , groupKey , dictionary ).addDictIds (rowDictIds );
541469 }
542470 }
543471 return ;
@@ -708,80 +636,53 @@ public boolean canUseStarTree(Map<String, Object> functionParameters) {
708636 }
709637
710638 /**
711- * Returns the dictionary id bitmap from the result holder or creates a new one if it does not exist .
639+ * Returns the {@link DictIdsWrapper} from the result holder, creating a new one if absent .
712640 */
713- protected static RoaringBitmap getDictIdBitmap (AggregationResultHolder aggregationResultHolder ,
641+ protected static DictIdsWrapper getDictIdBitSet (AggregationResultHolder aggregationResultHolder ,
714642 Dictionary dictionary ) {
715643 DictIdsWrapper dictIdsWrapper = aggregationResultHolder .getResult ();
716644 if (dictIdsWrapper == null ) {
717645 dictIdsWrapper = new DictIdsWrapper (dictionary );
718646 aggregationResultHolder .setValue (dictIdsWrapper );
719647 }
720- return dictIdsWrapper . _dictIdBitmap ;
648+ return dictIdsWrapper ;
721649 }
722650
723651 /**
724652 * Returns the HyperLogLog from the result holder or creates a new one if it does not exist.
725- * If the holder currently contains a {@link DictIdsWrapper} (e.g. because a prior block of a consuming segment used
726- * the bitmap path before the dictionary grew past the threshold), it is converted to a HyperLogLog first so that the
727- * holder always ends up holding a consistent type.
728653 */
729654 protected HyperLogLog getHyperLogLog (AggregationResultHolder aggregationResultHolder ) {
730- Object result = aggregationResultHolder .getResult ();
731- if (result == null ) {
732- HyperLogLog hyperLogLog = new HyperLogLog (_log2m );
655+ HyperLogLog hyperLogLog = aggregationResultHolder .getResult ();
656+ if (hyperLogLog == null ) {
657+ hyperLogLog = new HyperLogLog (_log2m );
733658 aggregationResultHolder .setValue (hyperLogLog );
734- return hyperLogLog ;
735659 }
736- if (result instanceof DictIdsWrapper ) {
737- HyperLogLog hyperLogLog = convertToHyperLogLog ((DictIdsWrapper ) result );
738- aggregationResultHolder .setValue (hyperLogLog );
739- return hyperLogLog ;
740- }
741- return (HyperLogLog ) result ;
660+ return hyperLogLog ;
742661 }
743662
744663 /**
745- * Returns the dictionary id bitmap for the given group key or creates a new one if it does not exist .
664+ * Returns the {@link DictIdsWrapper} for the given group key, creating a new one if absent .
746665 */
747- protected static RoaringBitmap getDictIdBitmap (GroupByResultHolder groupByResultHolder , int groupKey ,
666+ protected static DictIdsWrapper getDictIdBitSet (GroupByResultHolder groupByResultHolder , int groupKey ,
748667 Dictionary dictionary ) {
749668 DictIdsWrapper dictIdsWrapper = groupByResultHolder .getResult (groupKey );
750669 if (dictIdsWrapper == null ) {
751670 dictIdsWrapper = new DictIdsWrapper (dictionary );
752671 groupByResultHolder .setValueForKey (groupKey , dictIdsWrapper );
753672 }
754- return dictIdsWrapper . _dictIdBitmap ;
673+ return dictIdsWrapper ;
755674 }
756675
757676 /**
758677 * Returns the HyperLogLog for the given group key or creates a new one if it does not exist.
759- * If the holder currently contains a {@link DictIdsWrapper} for this group key, it is converted to a HyperLogLog
760- * first (same reasoning as {@link #getHyperLogLog(AggregationResultHolder)}).
761678 */
762679 protected HyperLogLog getHyperLogLog (GroupByResultHolder groupByResultHolder , int groupKey ) {
763- Object result = groupByResultHolder .getResult (groupKey );
764- if (result == null ) {
765- HyperLogLog hyperLogLog = new HyperLogLog (_log2m );
766- groupByResultHolder .setValueForKey (groupKey , hyperLogLog );
767- return hyperLogLog ;
768- }
769- if (result instanceof DictIdsWrapper ) {
770- HyperLogLog hyperLogLog = convertToHyperLogLog ((DictIdsWrapper ) result );
680+ HyperLogLog hyperLogLog = groupByResultHolder .getResult (groupKey );
681+ if (hyperLogLog == null ) {
682+ hyperLogLog = new HyperLogLog (_log2m );
771683 groupByResultHolder .setValueForKey (groupKey , hyperLogLog );
772- return hyperLogLog ;
773- }
774- return (HyperLogLog ) result ;
775- }
776-
777- /**
778- * Helper method to set dictionary id for the given group keys into the result holder.
779- */
780- private static void setDictIdForGroupKeys (GroupByResultHolder groupByResultHolder , int [] groupKeys ,
781- Dictionary dictionary , int dictId ) {
782- for (int groupKey : groupKeys ) {
783- getDictIdBitmap (groupByResultHolder , groupKey , dictionary ).add (dictId );
784684 }
685+ return hyperLogLog ;
785686 }
786687
787688 /**
@@ -794,26 +695,46 @@ private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[]
794695 }
795696
796697 /**
797- * Helper method to read dictionary and convert dictionary ids to HyperLogLog for dictionary-encoded expression .
698+ * Converts a {@link DictIdsWrapper} to a HyperLogLog by offering each distinct dictionary value exactly once .
798699 */
799700 private HyperLogLog convertToHyperLogLog (DictIdsWrapper dictIdsWrapper ) {
800701 HyperLogLog hyperLogLog = new HyperLogLog (_log2m );
801702 Dictionary dictionary = dictIdsWrapper ._dictionary ;
802- RoaringBitmap dictIdBitmap = dictIdsWrapper ._dictIdBitmap ;
803- PeekableIntIterator iterator = dictIdBitmap .getIntIterator ();
804- while (iterator .hasNext ()) {
805- hyperLogLog .offer (dictionary .get (iterator .next ()));
703+ BitSet bitSet = dictIdsWrapper ._bitSet ;
704+ for (int dictId = bitSet .nextSetBit (0 ); dictId >= 0 ; dictId = bitSet .nextSetBit (dictId + 1 )) {
705+ hyperLogLog .offer (dictionary .get (dictId ));
806706 }
807707 return hyperLogLog ;
808708 }
809709
810- private static final class DictIdsWrapper {
710+ /**
711+ * Wraps a {@link Dictionary} with a {@link BitSet} to collect and deduplicate dictionary IDs before offering
712+ * to HyperLogLog. BitSet gives O(1) insertion with no container-management overhead (unlike RoaringBitmap),
713+ * and uses dictSize/8 bytes of memory (e.g. 128 KB for a 1M-entry dictionary).
714+ */
715+ protected static final class DictIdsWrapper {
811716 final Dictionary _dictionary ;
812- final RoaringBitmap _dictIdBitmap ;
717+ final BitSet _bitSet ;
813718
814- private DictIdsWrapper (Dictionary dictionary ) {
719+ DictIdsWrapper (Dictionary dictionary ) {
815720 _dictionary = dictionary ;
816- _dictIdBitmap = new RoaringBitmap ();
721+ _bitSet = new BitSet (dictionary .length ());
722+ }
723+
724+ void set (int dictId ) {
725+ _bitSet .set (dictId );
726+ }
727+
728+ void addDictIds (int [] dictIds ) {
729+ for (int dictId : dictIds ) {
730+ _bitSet .set (dictId );
731+ }
732+ }
733+
734+ void addDictIds (int [] dictIds , int length ) {
735+ for (int i = 0 ; i < length ; i ++) {
736+ _bitSet .set (dictIds [i ]);
737+ }
817738 }
818739 }
819740}
0 commit comments