Copilot commented on code in PR #17820: URL: https://github.com/apache/pinot/pull/17820#discussion_r2921426875
########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java: ########## @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.JsonPathCache; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.DocIdSetBlock; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.query.distinct.table.StringDistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.IndexService; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.roaringbitmap.RoaringBitmap; + + +/** + * POC: Distinct operator that uses JSON index value→docId map directly instead of scanning docs. + * Avoids projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). + */ +public class JsonIndexDistinctOperator extends BaseOperator<DistinctResultsBlock> { + private static final String EXPLAIN_NAME = "DISTINCT_JSON_INDEX"; + private static final String FUNCTION_NAME = "jsonExtractIndex"; + + private final IndexSegment _indexSegment; + private final SegmentContext _segmentContext; + private final QueryContext _queryContext; + private final BaseFilterOperator _filterOperator; + + private int _numValuesProcessed = 0; + + public JsonIndexDistinctOperator(IndexSegment indexSegment, SegmentContext segmentContext, + QueryContext queryContext, BaseFilterOperator filterOperator) { + _indexSegment = indexSegment; + _segmentContext = segmentContext; + _queryContext = queryContext; + _filterOperator = filterOperator; + } + + @Override + protected DistinctResultsBlock getNextBlock() { + List<ExpressionContext> expressions = _queryContext.getSelectExpressions(); + if (expressions.size() != 1) { + throw new IllegalStateException("JsonIndexDistinctOperator supports single expression only"); + } + + ExpressionContext expr = expressions.get(0); + ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr); + if (parsed == null) { + throw new IllegalStateException("Expected jsonExtractIndex expression"); + } + + JsonIndexReader jsonIndexReader = getJsonIndexReader(parsed._columnName); + if (jsonIndexReader == null) { + throw new IllegalStateException("Column " + parsed._columnName + " has no JSON index"); + } + + // Same logic as JsonExtractIndexTransformFunction.getValueToMatchingDocsMap() + Map<String, RoaringBitmap> valueToMatchingDocs = + jsonIndexReader.getMatchingFlattenedDocsMap(parsed._jsonPathString, parsed._filterJsonPath); + if (parsed._isSingleValue) { + jsonIndexReader.convertFlattenedDocIdsToDocIds(valueToMatchingDocs); + } + + RoaringBitmap filteredDocIds = buildFilteredDocIds(); + + DataSchema dataSchema = new DataSchema( + new String[]{expr.toString()}, + new ColumnDataType[]{ColumnDataType.STRING}); + OrderByExpressionContext orderByExpression = _queryContext.getOrderByExpressions() != null + ? _queryContext.getOrderByExpressions().get(0) : null; + StringDistinctTable distinctTable = new StringDistinctTable( + dataSchema, _queryContext.getLimit(), _queryContext.isNullHandlingEnabled(), orderByExpression); + + int limit = _queryContext.getLimit(); + for (Map.Entry<String, RoaringBitmap> entry : valueToMatchingDocs.entrySet()) { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numValuesProcessed, EXPLAIN_NAME); + String value = entry.getKey(); + RoaringBitmap docIds = entry.getValue(); + + boolean includeValue; + if (filteredDocIds == null) { + includeValue = true; + } else { + RoaringBitmap intersection = RoaringBitmap.and(docIds, filteredDocIds); + includeValue = !intersection.isEmpty(); + } + + if (includeValue) { + if (distinctTable.hasLimit()) { + if (orderByExpression != null) { + distinctTable.addWithOrderBy(value); + } else { + if (distinctTable.addWithoutOrderBy(value)) { + break; + } + } + } else { + distinctTable.addUnbounded(value); + } + _numValuesProcessed++; + } + + if (distinctTable.hasLimit() && distinctTable.size() >= limit) { + break; + } + } + + return new DistinctResultsBlock(distinctTable, _queryContext); + } + + @Nullable + private JsonIndexReader getJsonIndexReader(String columnName) { + DataSource dataSource = _indexSegment.getDataSource(columnName, _queryContext.getSchema()); + JsonIndexReader reader = dataSource.getJsonIndex(); + if (reader == null) { + Optional<IndexType<?, ?, ?>> compositeIndex = + IndexService.getInstance().getOptional("composite_json_index"); + if (compositeIndex.isPresent()) { + reader = (JsonIndexReader) dataSource.getIndex(compositeIndex.get()); + } + } + return reader; Review Comment: The logic to locate a `JsonIndexReader` (checking `getJsonIndex()` then falling back to `composite_json_index`) is duplicated between `canUseJsonIndexDistinct()` (lines 297-306) and `getJsonIndexReader()` (lines 148-158). Consider extracting this into a single shared static method to avoid the duplication diverging over time. ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageWithoutStatsIntegrationTest.java: ########## @@ -97,7 +97,7 @@ public void setUp() @Override protected void overrideServerConf(PinotConfiguration serverConf) { serverConf.setProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_SEND_STATS_MODE, - SendStatsPredicate.Mode.NEVER.name()); + "NEVER"); Review Comment: This change from `SendStatsPredicate.Mode.NEVER.name()` to a hardcoded string `"NEVER"` appears unrelated to the PR's purpose (adding index-based distinct operators). If it's needed (e.g. to break a dependency), it should be mentioned in the PR description. Using the enum constant is safer against typos and refactoring. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java: ########## @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.JsonPathCache; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.DocIdSetBlock; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.query.distinct.table.StringDistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.IndexService; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.roaringbitmap.RoaringBitmap; + + +/** + * POC: Distinct operator that uses JSON index value→docId map directly instead of scanning docs. + * Avoids projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). Review Comment: The class Javadoc says "POC" (Proof of Concept). If this is intended for production use (as suggested by the integration tests and query options), the "POC" label should be removed. If it truly is a POC, it may not be ready to merge. ```suggestion * Distinct operator that uses the JSON index value→docId map directly instead of scanning documents. * Avoids the projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). ``` ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/InvertedIndexDistinctOperator.java: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.DocIdSetBlock; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.query.distinct.table.BigDecimalDistinctTable; +import org.apache.pinot.core.query.distinct.table.BytesDistinctTable; +import org.apache.pinot.core.query.distinct.table.DistinctTable; +import org.apache.pinot.core.query.distinct.table.DoubleDistinctTable; +import org.apache.pinot.core.query.distinct.table.FloatDistinctTable; +import org.apache.pinot.core.query.distinct.table.IntDistinctTable; +import org.apache.pinot.core.query.distinct.table.LongDistinctTable; +import org.apache.pinot.core.query.distinct.table.StringDistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.utils.ByteArray; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +/** + * Distinct operator that uses inverted index dictId→docIds map directly instead of scanning docs. + * Supports filter-aware SELECT DISTINCT on columns with inverted index, avoiding projection pipeline. + */ +public class InvertedIndexDistinctOperator extends BaseOperator<DistinctResultsBlock> { + private static final String EXPLAIN_NAME = "DISTINCT_INVERTED_INDEX"; + + private final IndexSegment _indexSegment; + private final SegmentContext _segmentContext; + private final QueryContext _queryContext; + private final BaseFilterOperator _filterOperator; + private final DataSource _dataSource; + private final Dictionary _dictionary; + private final InvertedIndexReader<?> _invertedIndexReader; + + private int _numValuesProcessed = 0; + + public InvertedIndexDistinctOperator(IndexSegment indexSegment, SegmentContext segmentContext, + QueryContext queryContext, BaseFilterOperator filterOperator, DataSource dataSource) { + _indexSegment = indexSegment; + _segmentContext = segmentContext; + _queryContext = queryContext; + _filterOperator = filterOperator; + _dataSource = dataSource; + _dictionary = dataSource.getDictionary(); + _invertedIndexReader = dataSource.getInvertedIndex(); + } + + @Override + protected DistinctResultsBlock getNextBlock() { + List<ExpressionContext> expressions = _queryContext.getSelectExpressions(); + if (expressions.size() != 1) { + throw new IllegalStateException("InvertedIndexDistinctOperator supports single expression only"); + } + + ExpressionContext expr = expressions.get(0); + String column = parseColumnIdentifier(expr); + if (column == null) { + throw new IllegalStateException("InvertedIndexDistinctOperator expects simple column identifier"); + } + + RoaringBitmap filteredDocIds = buildFilteredDocIds(); + DataSourceMetadata dataSourceMetadata = _dataSource.getDataSourceMetadata(); + DataSchema dataSchema = new DataSchema(new String[]{column}, + new ColumnDataType[]{ColumnDataType.fromDataTypeSV(dataSourceMetadata.getDataType())}); + OrderByExpressionContext orderByExpression = + _queryContext.getOrderByExpressions() != null ? _queryContext.getOrderByExpressions().get(0) : null; + + DistinctTable distinctTable = createDistinctTable(dataSchema, orderByExpression); + int dictLength = _dictionary.length(); + int limit = _queryContext.getLimit(); + + for (int dictId = 0; dictId < dictLength; dictId++) { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numValuesProcessed, EXPLAIN_NAME); + + Object docIdsObj = _invertedIndexReader.getDocIds(dictId); + if (!(docIdsObj instanceof ImmutableRoaringBitmap)) { + continue; + } + ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) docIdsObj; + if (docIds.isEmpty()) { + continue; + } + + boolean includeValue; + if (filteredDocIds == null) { + includeValue = true; + } else { + RoaringBitmap docIdsRoaring = docIds.toMutableRoaringBitmap().toRoaringBitmap(); + RoaringBitmap intersection = RoaringBitmap.and(docIdsRoaring, filteredDocIds); + includeValue = !intersection.isEmpty(); Review Comment: Unnecessary double conversion: `docIds.toMutableRoaringBitmap().toRoaringBitmap()` creates two intermediate bitmap copies. `RoaringBitmap.and()` accepts `ImmutableRoaringBitmap` parameters directly (since `RoaringBitmap` extends `ImmutableRoaringBitmap`), so you can simply check `ImmutableRoaringBitmap.andCardinality(docIds, filteredDocIds) > 0` or use `RoaringBitmap.intersects(docIds.toMutableRoaringBitmap(), filteredDocIds)` to avoid materializing the intersection entirely. This is called for every dictionary entry so the extra allocations could be significant. ```suggestion includeValue = ImmutableRoaringBitmap.andCardinality(docIds, filteredDocIds) > 0; ``` ########## pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java: ########## @@ -36,6 +36,7 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.HashSet; Review Comment: Import `HashSet` is not in alphabetical order relative to the surrounding imports. It should be placed before `LinkedHashMap` and after `Collections` to maintain alphabetical ordering consistent with the rest of the file. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/JsonIndexDistinctOperator.java: ########## @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import javax.annotation.Nullable; +import org.apache.pinot.common.function.JsonPathCache; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.DocIdSetBlock; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.query.distinct.table.StringDistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.index.IndexService; +import org.apache.pinot.segment.spi.index.IndexType; +import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.roaringbitmap.RoaringBitmap; + + +/** + * POC: Distinct operator that uses JSON index value→docId map directly instead of scanning docs. + * Avoids projection/transform pipeline for SELECT DISTINCT jsonExtractIndex(...). + */ +public class JsonIndexDistinctOperator extends BaseOperator<DistinctResultsBlock> { + private static final String EXPLAIN_NAME = "DISTINCT_JSON_INDEX"; + private static final String FUNCTION_NAME = "jsonExtractIndex"; + + private final IndexSegment _indexSegment; + private final SegmentContext _segmentContext; + private final QueryContext _queryContext; + private final BaseFilterOperator _filterOperator; + + private int _numValuesProcessed = 0; + + public JsonIndexDistinctOperator(IndexSegment indexSegment, SegmentContext segmentContext, + QueryContext queryContext, BaseFilterOperator filterOperator) { + _indexSegment = indexSegment; + _segmentContext = segmentContext; + _queryContext = queryContext; + _filterOperator = filterOperator; + } + + @Override + protected DistinctResultsBlock getNextBlock() { + List<ExpressionContext> expressions = _queryContext.getSelectExpressions(); + if (expressions.size() != 1) { + throw new IllegalStateException("JsonIndexDistinctOperator supports single expression only"); + } + + ExpressionContext expr = expressions.get(0); + ParsedJsonExtractIndex parsed = parseJsonExtractIndex(expr); + if (parsed == null) { + throw new IllegalStateException("Expected jsonExtractIndex expression"); + } + + JsonIndexReader jsonIndexReader = getJsonIndexReader(parsed._columnName); + if (jsonIndexReader == null) { + throw new IllegalStateException("Column " + parsed._columnName + " has no JSON index"); + } + + // Same logic as JsonExtractIndexTransformFunction.getValueToMatchingDocsMap() + Map<String, RoaringBitmap> valueToMatchingDocs = + jsonIndexReader.getMatchingFlattenedDocsMap(parsed._jsonPathString, parsed._filterJsonPath); + if (parsed._isSingleValue) { + jsonIndexReader.convertFlattenedDocIdsToDocIds(valueToMatchingDocs); + } + + RoaringBitmap filteredDocIds = buildFilteredDocIds(); + + DataSchema dataSchema = new DataSchema( + new String[]{expr.toString()}, + new ColumnDataType[]{ColumnDataType.STRING}); + OrderByExpressionContext orderByExpression = _queryContext.getOrderByExpressions() != null + ? _queryContext.getOrderByExpressions().get(0) : null; + StringDistinctTable distinctTable = new StringDistinctTable( + dataSchema, _queryContext.getLimit(), _queryContext.isNullHandlingEnabled(), orderByExpression); + + int limit = _queryContext.getLimit(); + for (Map.Entry<String, RoaringBitmap> entry : valueToMatchingDocs.entrySet()) { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numValuesProcessed, EXPLAIN_NAME); + String value = entry.getKey(); + RoaringBitmap docIds = entry.getValue(); + + boolean includeValue; + if (filteredDocIds == null) { + includeValue = true; + } else { + RoaringBitmap intersection = RoaringBitmap.and(docIds, filteredDocIds); + includeValue = !intersection.isEmpty(); + } + + if (includeValue) { + if (distinctTable.hasLimit()) { + if (orderByExpression != null) { + distinctTable.addWithOrderBy(value); + } else { + if (distinctTable.addWithoutOrderBy(value)) { + break; + } + } + } else { + distinctTable.addUnbounded(value); + } + _numValuesProcessed++; + } + + if (distinctTable.hasLimit() && distinctTable.size() >= limit) { + break; + } + } + + return new DistinctResultsBlock(distinctTable, _queryContext); + } + + @Nullable + private JsonIndexReader getJsonIndexReader(String columnName) { + DataSource dataSource = _indexSegment.getDataSource(columnName, _queryContext.getSchema()); + JsonIndexReader reader = dataSource.getJsonIndex(); + if (reader == null) { + Optional<IndexType<?, ?, ?>> compositeIndex = + IndexService.getInstance().getOptional("composite_json_index"); + if (compositeIndex.isPresent()) { + reader = (JsonIndexReader) dataSource.getIndex(compositeIndex.get()); + } + } + return reader; + } + + @Nullable + private RoaringBitmap buildFilteredDocIds() { + if (_filterOperator.isResultMatchingAll()) { + return null; + } + + if (_filterOperator.canProduceBitmaps()) { + return _filterOperator.getBitmaps().reduce().toRoaringBitmap(); + } + + if (_filterOperator.isResultEmpty()) { + return new RoaringBitmap(); + } + + RoaringBitmap bitmap = new RoaringBitmap(); + DocIdSetPlanNode docIdSetPlanNode = new DocIdSetPlanNode( + _segmentContext, _queryContext, DocIdSetPlanNode.MAX_DOC_PER_CALL, _filterOperator); + var docIdSetOperator = docIdSetPlanNode.run(); + DocIdSetBlock block; + while ((block = docIdSetOperator.nextBlock()) != null) { + int[] docIds = block.getDocIds(); + int length = block.getLength(); + bitmap.addN(docIds, 0, length); + } + return bitmap; + } Review Comment: The `buildFilteredDocIds()` method is duplicated verbatim between `JsonIndexDistinctOperator` and `InvertedIndexDistinctOperator`. Consider extracting this into a shared utility method (e.g., in a common base class or a static helper) to avoid code duplication and make future maintenance easier. ########## pinot-core/src/main/java/org/apache/pinot/core/operator/query/InvertedIndexDistinctOperator.java: ########## @@ -0,0 +1,396 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.query; + +import com.google.common.base.CaseFormat; +import java.util.Collections; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.pinot.common.request.context.ExpressionContext; +import org.apache.pinot.common.request.context.OrderByExpressionContext; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.BaseOperator; +import org.apache.pinot.core.operator.ExecutionStatistics; +import org.apache.pinot.core.operator.ExplainAttributeBuilder; +import org.apache.pinot.core.operator.blocks.DocIdSetBlock; +import org.apache.pinot.core.operator.blocks.results.DistinctResultsBlock; +import org.apache.pinot.core.operator.filter.BaseFilterOperator; +import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.query.distinct.table.BigDecimalDistinctTable; +import org.apache.pinot.core.query.distinct.table.BytesDistinctTable; +import org.apache.pinot.core.query.distinct.table.DistinctTable; +import org.apache.pinot.core.query.distinct.table.DoubleDistinctTable; +import org.apache.pinot.core.query.distinct.table.FloatDistinctTable; +import org.apache.pinot.core.query.distinct.table.IntDistinctTable; +import org.apache.pinot.core.query.distinct.table.LongDistinctTable; +import org.apache.pinot.core.query.distinct.table.StringDistinctTable; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.segment.spi.index.reader.NullValueVectorReader; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.utils.ByteArray; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +/** + * Distinct operator that uses inverted index dictId→docIds map directly instead of scanning docs. + * Supports filter-aware SELECT DISTINCT on columns with inverted index, avoiding projection pipeline. + */ +public class InvertedIndexDistinctOperator extends BaseOperator<DistinctResultsBlock> { + private static final String EXPLAIN_NAME = "DISTINCT_INVERTED_INDEX"; + + private final IndexSegment _indexSegment; + private final SegmentContext _segmentContext; + private final QueryContext _queryContext; + private final BaseFilterOperator _filterOperator; + private final DataSource _dataSource; + private final Dictionary _dictionary; + private final InvertedIndexReader<?> _invertedIndexReader; + + private int _numValuesProcessed = 0; + + public InvertedIndexDistinctOperator(IndexSegment indexSegment, SegmentContext segmentContext, + QueryContext queryContext, BaseFilterOperator filterOperator, DataSource dataSource) { + _indexSegment = indexSegment; + _segmentContext = segmentContext; + _queryContext = queryContext; + _filterOperator = filterOperator; + _dataSource = dataSource; + _dictionary = dataSource.getDictionary(); + _invertedIndexReader = dataSource.getInvertedIndex(); + } + + @Override + protected DistinctResultsBlock getNextBlock() { + List<ExpressionContext> expressions = _queryContext.getSelectExpressions(); + if (expressions.size() != 1) { + throw new IllegalStateException("InvertedIndexDistinctOperator supports single expression only"); + } + + ExpressionContext expr = expressions.get(0); + String column = parseColumnIdentifier(expr); + if (column == null) { + throw new IllegalStateException("InvertedIndexDistinctOperator expects simple column identifier"); + } + + RoaringBitmap filteredDocIds = buildFilteredDocIds(); + DataSourceMetadata dataSourceMetadata = _dataSource.getDataSourceMetadata(); + DataSchema dataSchema = new DataSchema(new String[]{column}, + new ColumnDataType[]{ColumnDataType.fromDataTypeSV(dataSourceMetadata.getDataType())}); + OrderByExpressionContext orderByExpression = + _queryContext.getOrderByExpressions() != null ? _queryContext.getOrderByExpressions().get(0) : null; + + DistinctTable distinctTable = createDistinctTable(dataSchema, orderByExpression); + int dictLength = _dictionary.length(); + int limit = _queryContext.getLimit(); + + for (int dictId = 0; dictId < dictLength; dictId++) { + QueryThreadContext.checkTerminationAndSampleUsagePeriodically(_numValuesProcessed, EXPLAIN_NAME); + + Object docIdsObj = _invertedIndexReader.getDocIds(dictId); + if (!(docIdsObj instanceof ImmutableRoaringBitmap)) { + continue; + } + ImmutableRoaringBitmap docIds = (ImmutableRoaringBitmap) docIdsObj; + if (docIds.isEmpty()) { + continue; + } + + boolean includeValue; + if (filteredDocIds == null) { + includeValue = true; + } else { + RoaringBitmap docIdsRoaring = docIds.toMutableRoaringBitmap().toRoaringBitmap(); + RoaringBitmap intersection = RoaringBitmap.and(docIdsRoaring, filteredDocIds); + includeValue = !intersection.isEmpty(); + } + + if (includeValue) { + boolean done = addValueToDistinctTable(distinctTable, dictId, orderByExpression); + _numValuesProcessed++; + if (done) { + break; + } + } + + if (distinctTable.hasLimit() && distinctTable.size() >= limit) { + break; + } + } + + return new DistinctResultsBlock(distinctTable, _queryContext); + } + + private DistinctTable createDistinctTable(DataSchema dataSchema, + @Nullable OrderByExpressionContext orderByExpression) { + int limit = _queryContext.getLimit(); + switch (_dictionary.getValueType()) { + case INT: + return new IntDistinctTable(dataSchema, limit, _queryContext.isNullHandlingEnabled(), orderByExpression); + case LONG: + return new LongDistinctTable(dataSchema, limit, _queryContext.isNullHandlingEnabled(), orderByExpression); + case FLOAT: + return new FloatDistinctTable(dataSchema, limit, _queryContext.isNullHandlingEnabled(), orderByExpression); + case DOUBLE: + return new DoubleDistinctTable(dataSchema, limit, _queryContext.isNullHandlingEnabled(), orderByExpression); + case BIG_DECIMAL: + return new BigDecimalDistinctTable(dataSchema, limit, _queryContext.isNullHandlingEnabled(), orderByExpression); + case STRING: + return new StringDistinctTable(dataSchema, limit, _queryContext.isNullHandlingEnabled(), orderByExpression); + case BYTES: + return new BytesDistinctTable(dataSchema, limit, _queryContext.isNullHandlingEnabled(), orderByExpression); + default: + throw new IllegalStateException("Unsupported data type: " + _dictionary.getValueType()); + } + } + + private boolean addValueToDistinctTable(DistinctTable distinctTable, int dictId, + @Nullable OrderByExpressionContext orderByExpression) { + switch (_dictionary.getValueType()) { + case INT: + return addToTable((IntDistinctTable) distinctTable, _dictionary.getIntValue(dictId), orderByExpression); + case LONG: + return addToTable((LongDistinctTable) distinctTable, _dictionary.getLongValue(dictId), orderByExpression); + case FLOAT: + return addToTable((FloatDistinctTable) distinctTable, _dictionary.getFloatValue(dictId), orderByExpression); + case DOUBLE: + return addToTable((DoubleDistinctTable) distinctTable, _dictionary.getDoubleValue(dictId), orderByExpression); + case BIG_DECIMAL: + return addToTable((BigDecimalDistinctTable) distinctTable, _dictionary.getBigDecimalValue(dictId), + orderByExpression); + case STRING: + return addToTable((StringDistinctTable) distinctTable, _dictionary.getStringValue(dictId), orderByExpression); + case BYTES: + return addToTable((BytesDistinctTable) distinctTable, _dictionary.getByteArrayValue(dictId), orderByExpression); + default: + throw new IllegalStateException("Unsupported data type: " + _dictionary.getValueType()); + } + } + + private static boolean addToTable(IntDistinctTable table, int value, + @Nullable OrderByExpressionContext orderByExpression) { + if (table.hasLimit()) { + if (orderByExpression != null) { + table.addWithOrderBy(value); + return false; + } else { + return table.addWithoutOrderBy(value); + } + } else { + table.addUnbounded(value); + return false; + } + } + + private static boolean addToTable(LongDistinctTable table, long value, + @Nullable OrderByExpressionContext orderByExpression) { + if (table.hasLimit()) { + if (orderByExpression != null) { + table.addWithOrderBy(value); + return false; + } else { + return table.addWithoutOrderBy(value); + } + } else { + table.addUnbounded(value); + return false; + } + } + + private static boolean addToTable(FloatDistinctTable table, float value, + @Nullable OrderByExpressionContext orderByExpression) { + if (table.hasLimit()) { + if (orderByExpression != null) { + table.addWithOrderBy(value); + return false; + } else { + return table.addWithoutOrderBy(value); + } + } else { + table.addUnbounded(value); + return false; + } + } + + private static boolean addToTable(DoubleDistinctTable table, double value, + @Nullable OrderByExpressionContext orderByExpression) { + if (table.hasLimit()) { + if (orderByExpression != null) { + table.addWithOrderBy(value); + return false; + } else { + return table.addWithoutOrderBy(value); + } + } else { + table.addUnbounded(value); + return false; + } + } + + private static boolean addToTable(BigDecimalDistinctTable table, java.math.BigDecimal value, + @Nullable OrderByExpressionContext orderByExpression) { + if (table.hasLimit()) { + if (orderByExpression != null) { + table.addWithOrderBy(value); + return false; + } else { + return table.addWithoutOrderBy(value); + } + } else { + table.addUnbounded(value); + return false; + } + } + + private static boolean addToTable(StringDistinctTable table, String value, + @Nullable OrderByExpressionContext orderByExpression) { + if (table.hasLimit()) { + if (orderByExpression != null) { + table.addWithOrderBy(value); + return false; + } else { + return table.addWithoutOrderBy(value); + } + } else { + table.addUnbounded(value); + return false; + } + } + + private static boolean addToTable(BytesDistinctTable table, ByteArray value, + @Nullable OrderByExpressionContext orderByExpression) { + if (table.hasLimit()) { + if (orderByExpression != null) { + table.addWithOrderBy(value); + return false; + } else { + return table.addWithoutOrderBy(value); + } + } else { + table.addUnbounded(value); + return false; + } + } Review Comment: The seven `addToTable` overloads (lines 194-297) are nearly identical, differing only in the value type. This is a significant amount of duplicated logic. Consider using a generic helper or consolidating via the `DistinctTable` base class (or a templated approach) to reduce boilerplate. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
