Skip to content

Commit 86fd8d7

Browse files
Deep Patelclaude
authored andcommitted
Fix OOM in group-by paths: offer dict values directly to HLL instead of per-group BitSet
BitSet deduplication in aggregateSVGroupBySV/aggregateMVGroupBySV/aggregateSVGroupByMV/aggregateMVGroupByMV was allocating one BitSet(dictSize/8 bytes) per group. With 100K groups × 3M-entry dict → 37.5 GB → OOM. HyperLogLog uses max-register semantics so duplicate offer() calls are no-ops — deduplication before HLL is unnecessary for accuracy. For group-by, offers values directly to HLL. BitSet deduplication is kept only for the non-group-by aggregation path (one BitSet per segment, bounded and cheap). Also removes the now-unused getDictIdBitSet(GroupByResultHolder, int, Dictionary) helper and simplifies extractGroupByResult to remove the DictIdsWrapper branch that can no longer occur. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent f2a7bec commit 86fd8d7

1 file changed

Lines changed: 21 additions & 35 deletions

File tree

pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountHLLAggregationFunction.java

Lines changed: 21 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -256,12 +256,16 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol
256256

257257
protected void aggregateSVGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
258258
BlockValSet blockValSet, DataType storedType) {
259-
// For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
259+
// For dictionary-encoded expression, offer dictionary values directly to HyperLogLog.
260+
// Unlike the non-group-by aggregation path (where a single BitSet over the full dict is cheap), the group-by
261+
// path creates one result per group. Pre-allocating a BitSet of dictSize/8 bytes per group would multiply memory
262+
// usage by numGroups (e.g. 100K groups × 375KB = 37.5GB for a 3M-entry dict). Since HLL is an approximation and
263+
// max-register semantics make duplicate offers a no-op, deduplication is unnecessary here.
260264
Dictionary dictionary = blockValSet.getDictionary();
261265
if (dictionary != null) {
262266
int[] dictIds = blockValSet.getDictionaryIdsSV();
263267
for (int i = 0; i < length; i++) {
264-
getDictIdBitSet(groupByResultHolder, groupKeyArray[i], dictionary).set(dictIds[i]);
268+
getHyperLogLog(groupByResultHolder, groupKeyArray[i]).offer(dictionary.get(dictIds[i]));
265269
}
266270
return;
267271
}
@@ -305,13 +309,15 @@ protected void aggregateSVGroupBySV(int length, int[] groupKeyArray, GroupByResu
305309

306310
protected void aggregateMVGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
307311
BlockValSet blockValSet, DataType storedType) {
308-
// For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
312+
// For dictionary-encoded expression, offer dictionary values directly to HyperLogLog (see aggregateSVGroupBySV).
309313
Dictionary dictionary = blockValSet.getDictionary();
310314
if (dictionary != null) {
311315
int[][] dictIds = blockValSet.getDictionaryIdsMV();
312316
for (int i = 0; i < length; i++) {
313-
DictIdsWrapper dictIdsWrapper = getDictIdBitSet(groupByResultHolder, groupKeyArray[i], dictionary);
314-
dictIdsWrapper.addDictIds(dictIds[i]);
317+
HyperLogLog hyperLogLog = getHyperLogLog(groupByResultHolder, groupKeyArray[i]);
318+
for (int dictId : dictIds[i]) {
319+
hyperLogLog.offer(dictionary.get(dictId));
320+
}
315321
}
316322
return;
317323
}
@@ -406,14 +412,14 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult
406412

407413
protected void aggregateSVGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
408414
BlockValSet blockValSet, DataType storedType) {
409-
// For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
415+
// For dictionary-encoded expression, offer dictionary values directly to HyperLogLog (see aggregateSVGroupBySV).
410416
Dictionary dictionary = blockValSet.getDictionary();
411417
if (dictionary != null) {
412418
int[] dictIds = blockValSet.getDictionaryIdsSV();
413419
for (int i = 0; i < length; i++) {
414-
int dictId = dictIds[i];
420+
Object value = dictionary.get(dictIds[i]);
415421
for (int groupKey : groupKeysArray[i]) {
416-
getDictIdBitSet(groupByResultHolder, groupKey, dictionary).set(dictId);
422+
getHyperLogLog(groupByResultHolder, groupKey).offer(value);
417423
}
418424
}
419425
return;
@@ -458,14 +464,17 @@ protected void aggregateSVGroupByMV(int length, int[][] groupKeysArray, GroupByR
458464

459465
protected void aggregateMVGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
460466
BlockValSet blockValSet, DataType storedType) {
461-
// For dictionary-encoded expression, collect dictionary ids into a BitSet for deduplication.
467+
// For dictionary-encoded expression, offer dictionary values directly to HyperLogLog (see aggregateSVGroupBySV).
462468
Dictionary dictionary = blockValSet.getDictionary();
463469
if (dictionary != null) {
464470
int[][] dictIds = blockValSet.getDictionaryIdsMV();
465471
for (int i = 0; i < length; i++) {
466472
int[] rowDictIds = dictIds[i];
467473
for (int groupKey : groupKeysArray[i]) {
468-
getDictIdBitSet(groupByResultHolder, groupKey, dictionary).addDictIds(rowDictIds);
474+
HyperLogLog hyperLogLog = getHyperLogLog(groupByResultHolder, groupKey);
475+
for (int dictId : rowDictIds) {
476+
hyperLogLog.offer(dictionary.get(dictId));
477+
}
469478
}
470479
}
471480
return;
@@ -556,18 +565,8 @@ public HyperLogLog extractAggregationResult(AggregationResultHolder aggregationR
556565

557566
@Override
558567
public HyperLogLog extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
559-
Object result = groupByResultHolder.getResult(groupKey);
560-
if (result == null) {
561-
return new HyperLogLog(_log2m);
562-
}
563-
564-
if (result instanceof DictIdsWrapper) {
565-
// For dictionary-encoded expression, convert dictionary ids to HyperLogLog
566-
return convertToHyperLogLog((DictIdsWrapper) result);
567-
} else {
568-
// For non-dictionary-encoded expression, directly return the HyperLogLog
569-
return (HyperLogLog) result;
570-
}
568+
HyperLogLog hyperLogLog = groupByResultHolder.getResult(groupKey);
569+
return hyperLogLog != null ? hyperLogLog : new HyperLogLog(_log2m);
571570
}
572571

573572
@Override
@@ -660,19 +659,6 @@ protected HyperLogLog getHyperLogLog(AggregationResultHolder aggregationResultHo
660659
return hyperLogLog;
661660
}
662661

663-
/**
664-
* Returns the {@link DictIdsWrapper} for the given group key, creating a new one if absent.
665-
*/
666-
protected static DictIdsWrapper getDictIdBitSet(GroupByResultHolder groupByResultHolder, int groupKey,
667-
Dictionary dictionary) {
668-
DictIdsWrapper dictIdsWrapper = groupByResultHolder.getResult(groupKey);
669-
if (dictIdsWrapper == null) {
670-
dictIdsWrapper = new DictIdsWrapper(dictionary);
671-
groupByResultHolder.setValueForKey(groupKey, dictIdsWrapper);
672-
}
673-
return dictIdsWrapper;
674-
}
675-
676662
/**
677663
* Returns the HyperLogLog for the given group key or creates a new one if it does not exist.
678664
*/

0 commit comments

Comments
 (0)