2424import java .util .List ;
2525import java .util .Map ;
2626import java .util .Optional ;
27+ import java .util .Set ;
2728import javax .annotation .Nullable ;
2829import org .apache .pinot .common .function .JsonPathCache ;
2930import org .apache .pinot .common .request .context .ExpressionContext ;
31+ import org .apache .pinot .common .request .context .FilterContext ;
3032import org .apache .pinot .common .request .context .OrderByExpressionContext ;
33+ import org .apache .pinot .common .request .context .RequestContextUtils ;
34+ import org .apache .pinot .common .request .context .predicate .JsonMatchPredicate ;
35+ import org .apache .pinot .common .request .context .predicate .Predicate ;
3136import org .apache .pinot .common .utils .DataSchema ;
3237import org .apache .pinot .common .utils .DataSchema .ColumnDataType ;
3338import org .apache .pinot .core .common .Operator ;
5459import org .apache .pinot .segment .spi .index .reader .JsonIndexReader ;
5560import org .apache .pinot .spi .data .FieldSpec ;
5661import org .apache .pinot .spi .query .QueryThreadContext ;
62+ import org .apache .pinot .sql .parsers .CalciteSqlParser ;
5763import org .roaringbitmap .RoaringBitmap ;
5864
5965
6066/**
61- * Distinct operator that uses the JSON index value→docId map directly instead of scanning documents.
62- * Avoids the projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...).
67+ * Distinct operator for the scalar {@code jsonExtractIndex(column, path, type[, defaultValue])} form.
68+ *
69+ * <p>Execution flow:
70+ * 1. Push a same-path {@code JSON_MATCH} predicate into the JSON-index lookup when it cannot match missing paths.
71+ * 2. Convert matching flattened doc ids back to segment doc ids.
72+ * 3. Apply any remaining row-level filter and materialize DISTINCT results, including missing-path handling.
6373 */
6474public class JsonIndexDistinctOperator extends BaseOperator <DistinctResultsBlock > {
6575 private static final String EXPLAIN_NAME = "DISTINCT_JSON_INDEX" ;
@@ -91,10 +101,29 @@ protected DistinctResultsBlock getNextBlock() {
91101 ExpressionContext expr = expressions .get (0 );
92102 ParsedJsonExtractIndex parsed = parseJsonExtractIndex (expr );
93103 if (parsed == null ) {
94- throw new IllegalStateException ("Expected jsonExtractIndex expression" );
104+ throw new IllegalStateException ("Expected 3/4-arg scalar jsonExtractIndex expression" );
95105 }
96106
97- // Evaluate the filter first so we can skip the (potentially expensive) index map when no docs match
107+ DataSource dataSource = _indexSegment .getDataSource (parsed ._columnName , _queryContext .getSchema ());
108+ JsonIndexReader jsonIndexReader = getJsonIndexReader (dataSource );
109+ if (jsonIndexReader == null ) {
110+ throw new IllegalStateException ("Column " + parsed ._columnName + " has no JSON index" );
111+ }
112+
113+ String pushedDownFilterJson = extractSamePathJsonMatchFilter (parsed , _queryContext .getFilter ());
114+ boolean filterFullyPushedDown = pushedDownFilterJson != null
115+ && isOnlySamePathJsonMatchFilter (parsed , _queryContext .getFilter ())
116+ && !jsonMatchFilterCanMatchMissingPath (pushedDownFilterJson );
117+
118+ // Fast path: when the filter is fully pushed down into the JSON index, we only need the distinct value strings.
119+ // This avoids reading posting lists, building per-value bitmaps, and converting flattened doc IDs.
120+ if (filterFullyPushedDown ) {
121+ Set <String > distinctValues = jsonIndexReader .getMatchingDistinctValues (
122+ parsed ._jsonPathString , pushedDownFilterJson );
123+ return buildDistinctResultsFromValues (expr , parsed , distinctValues );
124+ }
125+
126+ // Evaluate the filter first so we can skip the (potentially expensive) index map when no docs match.
98127 RoaringBitmap filteredDocIds = buildFilteredDocIds ();
99128 if (filteredDocIds != null && filteredDocIds .isEmpty ()) {
100129 ColumnDataType earlyColumnDataType = ColumnDataType .fromDataTypeSV (parsed ._dataType );
@@ -107,18 +136,46 @@ protected DistinctResultsBlock getNextBlock() {
107136 createDistinctTable (earlyDataSchema , parsed ._dataType , earlyOrderBy ), _queryContext );
108137 }
109138
110- DataSource dataSource = _indexSegment .getDataSource (parsed ._columnName , _queryContext .getSchema ());
111- JsonIndexReader jsonIndexReader = getJsonIndexReader (dataSource );
112- if (jsonIndexReader == null ) {
113- throw new IllegalStateException ("Column " + parsed ._columnName + " has no JSON index" );
114- }
115-
116- // Same logic as JsonExtractIndexTransformFunction.getValueToMatchingDocsMap()
139+ // All other WHERE filters remain row-level and are applied after converting flattened doc IDs to real doc IDs.
117140 Map <String , RoaringBitmap > valueToMatchingDocs =
118- jsonIndexReader .getMatchingFlattenedDocsMap (parsed ._jsonPathString , parsed ._filterExpression );
141+ jsonIndexReader .getMatchingFlattenedDocsMap (parsed ._jsonPathString , pushedDownFilterJson );
142+
119143 // Always single-value (MV _ARRAY is rejected in parseJsonExtractIndex)
120144 jsonIndexReader .convertFlattenedDocIdsToDocIds (valueToMatchingDocs );
145+ return buildDistinctResultsBlock (expr , parsed , valueToMatchingDocs , filteredDocIds ,
146+ filteredDocIds == null );
147+ }
121148
149+ private DistinctResultsBlock buildDistinctResultsFromValues (ExpressionContext expr , ParsedJsonExtractIndex parsed ,
150+ Set <String > distinctValues ) {
151+ ColumnDataType columnDataType = ColumnDataType .fromDataTypeSV (parsed ._dataType );
152+ DataSchema dataSchema = new DataSchema (
153+ new String []{expr .toString ()},
154+ new ColumnDataType []{columnDataType });
155+ OrderByExpressionContext orderByExpression = _queryContext .getOrderByExpressions () != null
156+ ? _queryContext .getOrderByExpressions ().get (0 ) : null ;
157+ DistinctTable distinctTable = createDistinctTable (dataSchema , parsed ._dataType , orderByExpression );
158+ int limit = _queryContext .getLimit ();
159+
160+ for (String value : distinctValues ) {
161+ _numEntriesExamined ++;
162+ QueryThreadContext .checkTerminationAndSampleUsagePeriodically (_numEntriesExamined , EXPLAIN_NAME );
163+
164+ boolean done = addValueToDistinctTable (distinctTable , value , parsed ._dataType , orderByExpression );
165+ if (done ) {
166+ break ;
167+ }
168+ if (orderByExpression == null && distinctTable .hasLimit () && distinctTable .size () >= limit ) {
169+ break ;
170+ }
171+ }
172+
173+ return new DistinctResultsBlock (distinctTable , _queryContext );
174+ }
175+
176+ private DistinctResultsBlock buildDistinctResultsBlock (ExpressionContext expr , ParsedJsonExtractIndex parsed ,
177+ Map <String , RoaringBitmap > valueToMatchingDocs , @ Nullable RoaringBitmap filteredDocIds ,
178+ boolean allDocsSelected ) {
122179 ColumnDataType columnDataType = ColumnDataType .fromDataTypeSV (parsed ._dataType );
123180 DataSchema dataSchema = new DataSchema (
124181 new String []{expr .toString ()},
@@ -129,32 +186,30 @@ protected DistinctResultsBlock getNextBlock() {
129186
130187 int limit = _queryContext .getLimit ();
131188 int totalDocs = _indexSegment .getSegmentMetadata ().getTotalDocs ();
132- // Track uncovered docs: for the no-filter case, build a union and compare against totalDocs.
133- // For the filtered case, use a "remaining" bitmap that shrinks in-place (no per-value allocation).
134- RoaringBitmap allMatchedDocs = filteredDocIds == null ? new RoaringBitmap () : null ;
189+ RoaringBitmap coveredDocs = allDocsSelected ? new RoaringBitmap () : null ;
135190 RoaringBitmap remainingDocs = filteredDocIds != null ? filteredDocIds .clone () : null ;
136- boolean allDocsCovered = filteredDocIds == null ? ( totalDocs == 0 ) : filteredDocIds .isEmpty ();
191+ boolean allDocsCovered = filteredDocIds == null ? ! allDocsSelected || totalDocs == 0 : filteredDocIds .isEmpty ();
137192 boolean earlyBreak = false ;
138193
139194 for (Map .Entry <String , RoaringBitmap > entry : valueToMatchingDocs .entrySet ()) {
140195 _numEntriesExamined ++;
141196 QueryThreadContext .checkTerminationAndSampleUsagePeriodically (_numEntriesExamined , EXPLAIN_NAME );
197+
142198 String value = entry .getKey ();
143199 RoaringBitmap docIds = entry .getValue ();
144200
145201 boolean includeValue ;
146202 if (filteredDocIds == null ) {
147203 includeValue = true ;
148- // Build union for uncovered-docs detection; short-circuit once all segment docs are covered
149- if (!allDocsCovered ) {
150- allMatchedDocs .or (docIds );
151- if (allMatchedDocs .getLongCardinality () >= totalDocs ) {
204+ if (!allDocsCovered && allDocsSelected ) {
205+ coveredDocs .or (docIds );
206+ if (coveredDocs .getLongCardinality () >= totalDocs ) {
152207 allDocsCovered = true ;
153208 }
154209 }
155210 } else {
156211 includeValue = RoaringBitmap .intersects (docIds , filteredDocIds );
157- // Remove matched docs from remaining set in-place (no allocation per value)
212+ // Remove matched docs from remaining set in-place (no allocation per value).
158213 if (!allDocsCovered && includeValue ) {
159214 remainingDocs .andNot (docIds );
160215 if (remainingDocs .isEmpty ()) {
@@ -177,25 +232,92 @@ protected DistinctResultsBlock getNextBlock() {
177232 }
178233 }
179234
180- // Handle docs not covered by any value in the index.
181- // Baseline JsonExtractIndexTransformFunction throws when a doc is missing the path and no
182- // defaultValue is provided. Match that behavior here unless nullHandling is enabled.
183- // allDocsCovered tracks coverage precisely (against totalDocs or filteredDocIds cardinality).
184235 if (!earlyBreak && !allDocsCovered ) {
185- if (parsed ._defaultValue != null ) {
186- addValueToDistinctTable (distinctTable , parsed ._defaultValue , parsed ._dataType , orderByExpression );
187- } else if (_queryContext .isNullHandlingEnabled ()) {
188- distinctTable .addNull ();
189- } else {
190- throw new RuntimeException (
191- String .format ("Illegal Json Path: [%s], for some docIds in segment [%s]" ,
192- parsed ._jsonPathString , _indexSegment .getSegmentName ()));
193- }
236+ handleMissingDocs (distinctTable , parsed , orderByExpression );
194237 }
195238
196239 return new DistinctResultsBlock (distinctTable , _queryContext );
197240 }
198241
242+ private void handleMissingDocs (DistinctTable distinctTable , ParsedJsonExtractIndex parsed ,
243+ @ Nullable OrderByExpressionContext orderByExpression ) {
244+ if (parsed ._defaultValueLiteral != null ) {
245+ addValueToDistinctTable (distinctTable , parsed ._defaultValueLiteral , parsed ._dataType , orderByExpression );
246+ } else if (_queryContext .isNullHandlingEnabled ()) {
247+ distinctTable .addNull ();
248+ } else {
249+ throw new RuntimeException (
250+ String .format ("Illegal Json Path: [%s], for some docIds in segment [%s]" ,
251+ parsed ._jsonPathString , _indexSegment .getSegmentName ()));
252+ }
253+ }
254+
255+ @ Nullable
256+ private static String extractSamePathJsonMatchFilter (ParsedJsonExtractIndex parsed , @ Nullable FilterContext filter ) {
257+ if (filter == null ) {
258+ return null ;
259+ }
260+ switch (filter .getType ()) {
261+ case PREDICATE :
262+ return extractSamePathJsonMatchFilter (parsed , filter .getPredicate ());
263+ case AND :
264+ String matchingFilter = null ;
265+ for (FilterContext child : filter .getChildren ()) {
266+ String childFilter = extractSamePathJsonMatchFilter (parsed , child );
267+ if (childFilter == null ) {
268+ continue ;
269+ }
270+ if (matchingFilter != null ) {
271+ return null ;
272+ }
273+ matchingFilter = childFilter ;
274+ }
275+ return matchingFilter ;
276+ default :
277+ return null ;
278+ }
279+ }
280+
281+ private static boolean isOnlySamePathJsonMatchFilter (ParsedJsonExtractIndex parsed , @ Nullable FilterContext filter ) {
282+ if (filter == null || filter .getType () != FilterContext .Type .PREDICATE ) {
283+ return false ;
284+ }
285+ return extractSamePathJsonMatchFilter (parsed , filter .getPredicate ()) != null ;
286+ }
287+
288+ private static boolean jsonMatchFilterCanMatchMissingPath (String filterJsonString ) {
289+ try {
290+ FilterContext filter = RequestContextUtils .getFilter (CalciteSqlParser .compileToExpression (filterJsonString ));
291+ return filter .getType () == FilterContext .Type .PREDICATE
292+ && filter .getPredicate ().getType () == Predicate .Type .IS_NULL ;
293+ } catch (Exception e ) {
294+ return false ;
295+ }
296+ }
297+
298+ @ Nullable
299+ private static String extractSamePathJsonMatchFilter (ParsedJsonExtractIndex parsed , Predicate predicate ) {
300+ if (!(predicate instanceof JsonMatchPredicate )) {
301+ return null ;
302+ }
303+ ExpressionContext lhs = predicate .getLhs ();
304+ if (lhs .getType () != ExpressionContext .Type .IDENTIFIER
305+ || !parsed ._columnName .equals (lhs .getIdentifier ())) {
306+ return null ;
307+ }
308+ String filterJsonString = ((JsonMatchPredicate ) predicate ).getValue ();
309+ int start = filterJsonString .indexOf ('"' );
310+ if (start < 0 ) {
311+ return null ;
312+ }
313+ int end = filterJsonString .indexOf ('"' , start + 1 );
314+ if (end < 0 ) {
315+ return null ;
316+ }
317+ String filterPath = filterJsonString .substring (start + 1 , end );
318+ return parsed ._jsonPathString .equals (filterPath ) ? filterJsonString : null ;
319+ }
320+
199321 private DistinctTable createDistinctTable (DataSchema dataSchema , FieldSpec .DataType dataType ,
200322 @ Nullable OrderByExpressionContext orderByExpression ) {
201323 int limit = _queryContext .getLimit ();
@@ -380,14 +502,15 @@ private static ParsedJsonExtractIndex parseJsonExtractIndex(ExpressionContext ex
380502 return null ;
381503 }
382504 List <ExpressionContext > args = expr .getFunction ().getArguments ();
383- if (args .size () < 3 || args .size () > 5 ) {
505+ if (args .size () != 3 && args .size () != 4 ) {
384506 return null ;
385507 }
386508 if (args .get (0 ).getType () != ExpressionContext .Type .IDENTIFIER ) {
387509 return null ;
388510 }
389511 if (args .get (1 ).getType () != ExpressionContext .Type .LITERAL
390- || args .get (2 ).getType () != ExpressionContext .Type .LITERAL ) {
512+ || args .get (2 ).getType () != ExpressionContext .Type .LITERAL
513+ || (args .size () == 4 && args .get (3 ).getType () != ExpressionContext .Type .LITERAL )) {
391514 return null ;
392515 }
393516
@@ -428,41 +551,32 @@ private static ParsedJsonExtractIndex parseJsonExtractIndex(ExpressionContext ex
428551 return null ;
429552 }
430553
431- String defaultValue = null ;
432- if (args .size () >= 4 ) {
433- if (args .get (3 ).getType () != ExpressionContext .Type .LITERAL ) {
554+ String defaultValueLiteral = null ;
555+ if (args .size () == 4 ) {
556+ defaultValueLiteral = args .get (3 ).getLiteral ().getStringValue ();
557+ try {
558+ dataType .convert (defaultValueLiteral );
559+ } catch (Exception e ) {
434560 return null ;
435561 }
436- defaultValue = args .get (3 ).getLiteral ().getStringValue ();
437562 }
438563
439- String filterExpression = null ;
440- if (args .size () == 5 ) {
441- if (args .get (4 ).getType () != ExpressionContext .Type .LITERAL ) {
442- return null ;
443- }
444- filterExpression = args .get (4 ).getLiteral ().getStringValue ();
445- }
446-
447- return new ParsedJsonExtractIndex (columnName , jsonPathString , dataType , defaultValue , filterExpression );
564+ return new ParsedJsonExtractIndex (columnName , jsonPathString , dataType , defaultValueLiteral );
448565 }
449566
450567 private static final class ParsedJsonExtractIndex {
451568 final String _columnName ;
452569 final String _jsonPathString ;
453570 final FieldSpec .DataType _dataType ;
454571 @ Nullable
455- final String _defaultValue ;
456- @ Nullable
457- final String _filterExpression ;
572+ final String _defaultValueLiteral ;
458573
459- ParsedJsonExtractIndex (String columnName , String jsonPathString ,
460- FieldSpec . DataType dataType , @ Nullable String defaultValue , @ Nullable String filterExpression ) {
574+ ParsedJsonExtractIndex (String columnName , String jsonPathString , FieldSpec . DataType dataType ,
575+ @ Nullable String defaultValueLiteral ) {
461576 _columnName = columnName ;
462577 _jsonPathString = jsonPathString ;
463578 _dataType = dataType ;
464- _defaultValue = defaultValue ;
465- _filterExpression = filterExpression ;
579+ _defaultValueLiteral = defaultValueLiteral ;
466580 }
467581 }
468582
@@ -506,9 +620,9 @@ protected void explainAttributes(ExplainAttributeBuilder attributeBuilder) {
506620 }
507621
508622 /**
509- * Returns true if the expression is jsonExtractIndex on a column with JSON index and the path is indexed.
510- * For OSS JSON index all paths are indexed. For composite JSON index, only paths in invertedIndexConfigs
511- * are indexed per key.
623+ * Returns true if the expression is the 3/4-arg scalar jsonExtractIndex form on a column with JSON index and the
624+ * path is indexed. For OSS JSON index all paths are indexed. For composite JSON index, only paths in
625+ * invertedIndexConfigs are indexed per key.
512626 */
513627 public static boolean canUseJsonIndexDistinct (IndexSegment indexSegment , ExpressionContext expr ) {
514628 ParsedJsonExtractIndex parsed = parseJsonExtractIndex (expr );
@@ -526,9 +640,6 @@ public static boolean canUseJsonIndexDistinct(IndexSegment indexSegment, Express
526640 if (!reader .isPathIndexed (parsed ._jsonPathString )) {
527641 return false ;
528642 }
529- // The 5th arg (_filterExpression) is a JSON filter expression, not a plain JSON path,
530- // so isPathIndexed() is not appropriate for it. The reader's getMatchingFlattenedDocsMap()
531- // handles filter expressions internally.
532643 return true ;
533644 }
534645}
0 commit comments