This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 41f90f9ef8 Add APIs to IndexSegment as a preparation to support virtual DataSource (#15869) 41f90f9ef8 is described below commit 41f90f9ef81d6d91205df95c2685a3abbb0d4f89 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu May 22 18:19:34 2025 -0600 Add APIs to IndexSegment as a preparation to support virtual DataSource (#15869) --- ...xValueBasedSelectionOrderByCombineOperator.java | 8 +- .../operator/filter/ExpressionFilterOperator.java | 2 +- .../operator/query/SelectionOrderByOperator.java | 2 +- .../pinot/core/plan/AggregationPlanNode.java | 13 +-- .../apache/pinot/core/plan/DistinctPlanNode.java | 2 +- .../org/apache/pinot/core/plan/FilterPlanNode.java | 20 ++-- .../apache/pinot/core/plan/ProjectPlanNode.java | 3 +- .../apache/pinot/core/plan/SelectionPlanNode.java | 2 +- .../query/executor/LogicalTableExecutionInfo.java | 7 ++ .../query/executor/ServerQueryExecutorV1Impl.java | 2 + .../query/executor/SingleTableExecutionInfo.java | 24 +++-- .../core/query/executor/TableExecutionInfo.java | 5 + .../core/query/prefetch/DefaultFetchPlanner.java | 4 +- .../core/query/request/context/QueryContext.java | 11 ++ .../plan/maker/QueryOverrideWithHintsTest.java | 113 +++++---------------- .../indexsegment/immutable/EmptyIndexSegment.java | 17 ++-- .../immutable/ImmutableSegmentImpl.java | 15 ++- .../indexsegment/mutable/MutableSegmentImpl.java | 11 +- .../org/apache/pinot/segment/spi/IndexSegment.java | 29 ++++-- 19 files changed, 143 insertions(+), 147 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java index 776a838167..e9a513ad9e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/MinMaxValueBasedSelectionOrderByCombineOperator.java @@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.exception.QueryErrorCode; import org.apache.pinot.spi.exception.QueryErrorMessage; import org.slf4j.Logger; @@ -85,7 +86,7 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator _minMaxValueContexts = new ArrayList<>(_numOperators); for (Operator<BaseResultsBlock> operator : _operators) { - _minMaxValueContexts.add(new MinMaxValueContext(operator, firstOrderByColumn)); + _minMaxValueContexts.add(new MinMaxValueContext(operator, firstOrderByColumn, queryContext.getSchema())); } if (firstOrderByExpression.isAsc()) { // For ascending order, sort on column min value in ascending order @@ -313,9 +314,10 @@ public class MinMaxValueBasedSelectionOrderByCombineOperator final Comparable _minValue; final Comparable _maxValue; - MinMaxValueContext(Operator<BaseResultsBlock> operator, String column) { + MinMaxValueContext(Operator<BaseResultsBlock> operator, String column, Schema schema) { _operator = operator; - DataSourceMetadata dataSourceMetadata = operator.getIndexSegment().getDataSource(column).getDataSourceMetadata(); + DataSourceMetadata dataSourceMetadata = + operator.getIndexSegment().getDataSource(column, schema).getDataSourceMetadata(); _minValue = dataSourceMetadata.getMinValue(); _maxValue = dataSourceMetadata.getMaxValue(); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java index 6346be2372..27889f8ec3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/ExpressionFilterOperator.java @@ -64,7 +64,7 @@ public class ExpressionFilterOperator extends BaseFilterOperator { _dataSourceMap = new HashMap<>(mapCapacity); Map<String, ColumnContext> columnContextMap = new HashMap<>(mapCapacity); columns.forEach(column -> { - DataSource dataSource = segment.getDataSource(column); + DataSource dataSource = segment.getDataSource(column, queryContext.getSchema()); _dataSourceMap.put(column, dataSource); columnContextMap.put(column, ColumnContext.fromDataSource(dataSource)); }); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java index 95bf2a2e66..3321a3ae14 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/SelectionOrderByOperator.java @@ -279,7 +279,7 @@ public class SelectionOrderByOperator extends BaseOperator<SelectionResultsBlock int numColumns = columns.size(); Map<String, DataSource> dataSourceMap = new HashMap<>(); for (String column : columns) { - dataSourceMap.put(column, _indexSegment.getDataSource(column)); + dataSourceMap.put(column, _indexSegment.getDataSource(column, _queryContext.getSchema())); } try (ProjectionOperator projectionOperator = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java index cca14f2704..7db0aefd29 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java @@ -112,13 +112,13 @@ public class AggregationPlanNode implements PlanNode { boolean hasNullValues = _queryContext.isNullHandlingEnabled() && hasNullValues(aggregationFunctions); if (!hasNullValues) { // Priority 2: Check if non-scan based aggregation is feasible - if (filterOperator.isResultMatchingAll() && isFitForNonScanBasedPlan(aggregationFunctions, _indexSegment)) { + if (filterOperator.isResultMatchingAll() && isFitForNonScanBasedPlan()) { DataSource[] dataSources = new DataSource[aggregationFunctions.length]; for (int i = 0; i < aggregationFunctions.length; i++) { List<?> inputExpressions = aggregationFunctions[i].getInputExpressions(); if (!inputExpressions.isEmpty()) { String column = ((ExpressionContext) inputExpressions.get(0)).getIdentifier(); - dataSources[i] = _indexSegment.getDataSource(column); + dataSources[i] = _indexSegment.getDataSource(column, _queryContext.getSchema()); } } return new NonScanBasedAggregationOperator(_queryContext, dataSources, numTotalDocs); @@ -148,7 +148,7 @@ public class AggregationPlanNode implements PlanNode { for (ExpressionContext argument : aggregationFunction.getInputExpressions()) { switch (argument.getType()) { case IDENTIFIER: - DataSource dataSource = _indexSegment.getDataSource(argument.getIdentifier()); + DataSource dataSource = _indexSegment.getDataSource(argument.getIdentifier(), _queryContext.getSchema()); NullValueVectorReader nullValueVector = dataSource.getNullValueVector(); if (nullValueVector != null && !nullValueVector.getNullBitmap().isEmpty()) { return true; @@ -172,8 +172,9 @@ public class AggregationPlanNode implements PlanNode { * Returns {@code true} if the given aggregations can be solved with dictionary or column metadata, {@code false} * otherwise. */ - private static boolean isFitForNonScanBasedPlan(AggregationFunction[] aggregationFunctions, - IndexSegment indexSegment) { + private boolean isFitForNonScanBasedPlan() { + AggregationFunction[] aggregationFunctions = _queryContext.getAggregationFunctions(); + assert aggregationFunctions != null; for (AggregationFunction<?, ?> aggregationFunction : aggregationFunctions) { if (aggregationFunction.getType() == COUNT) { continue; @@ -182,7 +183,7 @@ public class AggregationPlanNode implements PlanNode { if (argument.getType() != ExpressionContext.Type.IDENTIFIER) { return false; } - DataSource dataSource = indexSegment.getDataSource(argument.getIdentifier()); + DataSource dataSource = _indexSegment.getDataSource(argument.getIdentifier(), _queryContext.getSchema()); if (DICTIONARY_BASED_FUNCTIONS.contains(aggregationFunction.getType())) { if (dataSource.getDictionary() != null) { continue; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java index 44ffcd98aa..fae9b8efad 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java @@ -54,7 +54,7 @@ public class DistinctPlanNode implements PlanNode { if (_queryContext.getFilter() == null && expressions.size() == 1) { String column = expressions.get(0).getIdentifier(); if (column != null) { - DataSource dataSource = _indexSegment.getDataSource(column); + DataSource dataSource = _indexSegment.getDataSource(column, _queryContext.getSchema()); if (dataSource.getDictionary() != null) { if (!_queryContext.isNullHandlingEnabled()) { return new DictionaryBasedDistinctOperator(dataSource, _queryContext); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java index 73f148c9c4..805c70a1c2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/FilterPlanNode.java @@ -148,8 +148,12 @@ public class FilterPlanNode implements PlanNode { findLiteral = true; } } - return columnName != null && _indexSegment.getDataSource(columnName).getH3Index() != null && findLiteral - && _queryContext.isIndexUseAllowed(columnName, FieldConfig.IndexType.H3); + if (columnName == null || !findLiteral) { + return false; + } + DataSource dataSource = _indexSegment.getDataSourceNullable(columnName); + return dataSource != null && dataSource.getH3Index() != null && _queryContext.isIndexUseAllowed(columnName, + FieldConfig.IndexType.H3); } /** @@ -179,16 +183,18 @@ public class FilterPlanNode implements PlanNode { if (arguments.get(0).getType() == ExpressionContext.Type.IDENTIFIER && arguments.get(1).getType() == ExpressionContext.Type.LITERAL) { String columnName = arguments.get(0).getIdentifier(); - return _indexSegment.getDataSource(columnName).getH3Index() != null - && _queryContext.isIndexUseAllowed(columnName, FieldConfig.IndexType.H3); + DataSource dataSource = _indexSegment.getDataSourceNullable(columnName); + return dataSource != null && dataSource.getH3Index() != null && _queryContext.isIndexUseAllowed(columnName, + FieldConfig.IndexType.H3); } return false; } else { if (arguments.get(1).getType() == ExpressionContext.Type.IDENTIFIER && arguments.get(0).getType() == ExpressionContext.Type.LITERAL) { String columnName = arguments.get(1).getIdentifier(); - return _indexSegment.getDataSource(columnName).getH3Index() != null - && _queryContext.isIndexUseAllowed(columnName, FieldConfig.IndexType.H3); + DataSource dataSource = _indexSegment.getDataSourceNullable(columnName); + return dataSource != null && dataSource.getH3Index() != null && _queryContext.isIndexUseAllowed(columnName, + FieldConfig.IndexType.H3); } return false; } @@ -256,7 +262,7 @@ public class FilterPlanNode implements PlanNode { } } else { String column = lhs.getIdentifier(); - DataSource dataSource = _indexSegment.getDataSource(column); + DataSource dataSource = _indexSegment.getDataSource(column, _queryContext.getSchema()); PredicateEvaluator predicateEvaluator; switch (predicate.getType()) { case TEXT_CONTAINS: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java index fdd47ba717..d9b4cb8aa8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/ProjectPlanNode.java @@ -75,7 +75,8 @@ public class ProjectPlanNode implements PlanNode { } } Map<String, DataSource> dataSourceMap = new HashMap<>(HashUtil.getHashMapCapacity(projectionColumns.size())); - projectionColumns.forEach(column -> dataSourceMap.put(column, _indexSegment.getDataSource(column))); + projectionColumns.forEach( + column -> dataSourceMap.put(column, _indexSegment.getDataSource(column, _queryContext.getSchema()))); // NOTE: Skip creating DocIdSetOperator when maxDocsPerCall is 0 (for selection query with LIMIT 0) DocIdSetOperator docIdSetOperator = _maxDocsPerCall > 0 ? new DocIdSetPlanNode(_segmentContext, _queryContext, _maxDocsPerCall, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java index e936cd6949..d59345e184 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/SelectionPlanNode.java @@ -158,7 +158,7 @@ public class SelectionPlanNode implements PlanNode { return false; } String column = orderByExpression.getExpression().getIdentifier(); - DataSource dataSource = _indexSegment.getDataSource(column); + DataSource dataSource = _indexSegment.getDataSource(column, _queryContext.getSchema()); // If there are null values, we cannot trust DataSourceMetadata.isSorted if (isNullHandlingEnabled) { NullValueVectorReader nullValueVector = dataSource.getNullValueVector(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java index 3a9c63f599..cc65a51852 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/LogicalTableExecutionInfo.java @@ -37,6 +37,7 @@ import org.apache.pinot.core.query.request.context.TimerContext; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,6 +67,12 @@ public class LogicalTableExecutionInfo implements TableExecutionInfo { _tableExecutionInfos = tableExecutionInfos; } + @Override + public Schema getSchema() { + // TODO: Return the schema of the logical table + return _tableExecutionInfos.get(0).getSchema(); + } + @Override public boolean hasRealtime() { return _tableExecutionInfos.stream() diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index aca9b861f9..97ee75dedb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -216,6 +216,8 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { .collect(Collectors.toList())); } + queryContext.setSchema(executionInfo.getSchema()); + // Gather stats for realtime consuming segments // TODO: the freshness time should not be collected at query time because there is no guarantee that the consuming // segment is queried (consuming segment might be pruned, or the server only contains relocated committed diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java index 2439dcbc35..aba2be49ad 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/SingleTableExecutionInfo.java @@ -45,6 +45,7 @@ import org.apache.pinot.segment.spi.MutableSegment; import org.apache.pinot.segment.spi.SegmentContext; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.table.UpsertConfig; +import org.apache.pinot.spi.data.Schema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,18 +152,18 @@ public class SingleTableExecutionInfo implements TableExecutionInfo { _notAcquiredSegments = notAcquiredSegments; } - @Override - public boolean hasRealtime() { - return _tableDataManager instanceof RealtimeTableDataManager; - } - - public TableDataManager getTableDataManager() { return _tableDataManager; } - public List<SegmentDataManager> getSegmentDataManagers() { - return _segmentDataManagers; + @Override + public Schema getSchema() { + return _tableDataManager.getCachedTableConfigAndSchema().getRight(); + } + + @Override + public boolean hasRealtime() { + return _tableDataManager instanceof RealtimeTableDataManager; } @Override @@ -189,14 +190,21 @@ public class SingleTableExecutionInfo implements TableExecutionInfo { return _providedSegmentContexts; } + @Override public List<String> getSegmentsToQuery() { return _segmentsToQuery; } + @Override public List<String> getOptionalSegments() { return _optionalSegments; } + @Override + public List<SegmentDataManager> getSegmentDataManagers() { + return _segmentDataManagers; + } + @Override public List<String> getNotAcquiredSegments() { return _notAcquiredSegments; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java index a820bade0d..6f172ecc72 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/TableExecutionInfo.java @@ -30,9 +30,14 @@ import org.apache.pinot.core.query.request.context.TimerContext; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentContext; +import org.apache.pinot.spi.data.Schema; public interface TableExecutionInfo { + + /// Returns the latest [Schema] for the table. + Schema getSchema(); + /** * Check if consuming segments are being queried. * @return true if consuming segments are being queried, false otherwise diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java index d198fb97e0..2cfcbcc9c7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/prefetch/DefaultFetchPlanner.java @@ -48,8 +48,8 @@ public class DefaultFetchPlanner implements FetchPlanner { extractEqInColumns(Objects.requireNonNull(queryContext.getFilter()), eqInColumns); Map<String, List<IndexType<?, ?, ?>>> columnToIndexList = new HashMap<>(); for (String column : eqInColumns) { - DataSource dataSource = indexSegment.getDataSource(column); - if (dataSource.getBloomFilter() != null) { + DataSource dataSource = indexSegment.getDataSourceNullable(column); + if (dataSource != null && dataSource.getBloomFilter() != null) { columnToIndexList.put(column, Collections.singletonList(StandardIndexes.bloomFilter())); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java index b1144e6044..d5f77b84fa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java @@ -42,6 +42,7 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFacto import org.apache.pinot.core.util.MemoizedClassAssociation; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.spi.config.table.FieldConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants.Server; @@ -98,6 +99,8 @@ public class QueryContext { private Set<String> _columns; // Other properties to be shared across all the segments + // Latest table schema at query time + private Schema _schema; // End time in milliseconds for the query private long _endTimeMs; // Whether to enable prefetch for the query @@ -314,6 +317,14 @@ public class QueryContext { return _columns; } + public Schema getSchema() { + return _schema; + } + + public void setSchema(Schema schema) { + _schema = schema; + } + public long getEndTimeMs() { return _endTimeMs; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java index c6750af635..274b891b12 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/QueryOverrideWithHintsTest.java @@ -18,14 +18,10 @@ */ package org.apache.pinot.core.plan.maker; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; -import javax.annotation.Nullable; import org.apache.pinot.common.request.Expression; import org.apache.pinot.common.request.PinotQuery; import org.apache.pinot.common.request.context.ExpressionContext; @@ -37,82 +33,26 @@ import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.segment.spi.IndexSegment; -import org.apache.pinot.segment.spi.SegmentMetadata; -import org.apache.pinot.segment.spi.datasource.DataSource; -import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; -import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.spi.data.FieldSpec; -import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.sql.parsers.CalciteSqlParser; +import org.mockito.Mockito; +import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; public class QueryOverrideWithHintsTest { - private final IndexSegment _indexSegment = new IndexSegment() { - @Override - public String getSegmentName() { - return null; - } + private IndexSegment _indexSegment; - @Override - public SegmentMetadata getSegmentMetadata() { - return null; - } - - @Override - public Set<String> getColumnNames() { - return ImmutableSet.of("$ts$MONTH"); - } - - @Override - public Set<String> getPhysicalColumnNames() { - return null; - } - - @Override - public DataSource getDataSource(String columnName) { - return null; - } - - @Override - public List<StarTreeV2> getStarTrees() { - return null; - } - - @Nullable - @Override - public ThreadSafeMutableRoaringBitmap getValidDocIds() { - return null; - } - - @Nullable - @Override - public ThreadSafeMutableRoaringBitmap getQueryableDocIds() { - return null; - } - - @Override - public GenericRow getRecord(int docId, GenericRow reuse) { - return null; - } - - @Override - public Object getValue(int docId, String column) { - return null; - } - - @Override - public void offload() { - } - - @Override - public void destroy() { - } - }; + @BeforeClass + public void setUp() { + _indexSegment = mock(IndexSegment.class); + Mockito.when(_indexSegment.getColumnNames()).thenReturn(Set.of("$ts$MONTH")); + } @Test public void testExpressionContextHashcode() { @@ -138,19 +78,19 @@ public class QueryOverrideWithHintsTest { assertNotEquals(expressionContext1.hashCode(), expressionContext2.hashCode()); expressionContext1 = ExpressionContext.forFunction(new FunctionContext(FunctionContext.Type.TRANSFORM, "func1", - ImmutableList.of(ExpressionContext.forIdentifier("abc"), + List.of(ExpressionContext.forIdentifier("abc"), ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "abc")))); expressionContext2 = ExpressionContext.forFunction(new FunctionContext(FunctionContext.Type.TRANSFORM, "func1", - ImmutableList.of(ExpressionContext.forIdentifier("abc"), + List.of(ExpressionContext.forIdentifier("abc"), ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "abc")))); assertEquals(expressionContext1, expressionContext2); assertEquals(expressionContext1.hashCode(), expressionContext2.hashCode()); expressionContext1 = ExpressionContext.forFunction(new FunctionContext(FunctionContext.Type.TRANSFORM, "datetrunc", - ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"), + List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"), ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "event_time_ts")))); expressionContext2 = ExpressionContext.forFunction(new FunctionContext(FunctionContext.Type.TRANSFORM, "datetrunc", - ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"), + List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"), ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "event_time_ts")))); assertEquals(expressionContext1, expressionContext2); assertEquals(expressionContext1.hashCode(), expressionContext2.hashCode()); @@ -160,15 +100,14 @@ public class QueryOverrideWithHintsTest { public void testOverrideFilterWithExpressionOverrideHints() { ExpressionContext dateTruncFunctionExpr = ExpressionContext.forFunction( new FunctionContext(FunctionContext.Type.TRANSFORM, "dateTrunc", new ArrayList<>(new ArrayList<>( - ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "MONTH"), + List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "MONTH"), ExpressionContext.forIdentifier("ts")))))); ExpressionContext timestampIndexColumn = ExpressionContext.forIdentifier("$ts$MONTH"); ExpressionContext equalsExpression = ExpressionContext.forFunction( new FunctionContext(FunctionContext.Type.TRANSFORM, "EQUALS", new ArrayList<>( - ImmutableList.of(dateTruncFunctionExpr, - ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000))))); + List.of(dateTruncFunctionExpr, ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000))))); FilterContext filter = RequestContextUtils.getFilter(equalsExpression); - Map<ExpressionContext, ExpressionContext> hints = ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn); + Map<ExpressionContext, ExpressionContext> hints = Map.of(dateTruncFunctionExpr, timestampIndexColumn); InstancePlanMakerImplV2.overrideWithExpressionHints(filter, _indexSegment, hints); assertEquals(filter.getType(), FilterContext.Type.PREDICATE); assertEquals(filter.getPredicate().getLhs(), timestampIndexColumn); @@ -176,7 +115,7 @@ public class QueryOverrideWithHintsTest { FilterContext andFilter = RequestContextUtils.getFilter(ExpressionContext.forFunction( new FunctionContext(FunctionContext.Type.TRANSFORM, "AND", - new ArrayList<>(ImmutableList.of(equalsExpression, equalsExpression))))); + new ArrayList<>(List.of(equalsExpression, equalsExpression))))); InstancePlanMakerImplV2.overrideWithExpressionHints(andFilter, _indexSegment, hints); assertEquals(andFilter.getChildren().get(0).getPredicate().getLhs(), timestampIndexColumn); assertEquals(andFilter.getChildren().get(1).getPredicate().getLhs(), timestampIndexColumn); @@ -186,14 +125,13 @@ public class QueryOverrideWithHintsTest { public void testOverrideWithExpressionOverrideHints() { ExpressionContext dateTruncFunctionExpr = ExpressionContext.forFunction( new FunctionContext(FunctionContext.Type.TRANSFORM, "dateTrunc", new ArrayList<>( - ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "MONTH"), + List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "MONTH"), ExpressionContext.forIdentifier("ts"))))); ExpressionContext timestampIndexColumn = ExpressionContext.forIdentifier("$ts$MONTH"); ExpressionContext equalsExpression = ExpressionContext.forFunction( new FunctionContext(FunctionContext.Type.TRANSFORM, "EQUALS", new ArrayList<>( - ImmutableList.of(dateTruncFunctionExpr, - ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000))))); - Map<ExpressionContext, ExpressionContext> hints = ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn); + List.of(dateTruncFunctionExpr, ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000))))); + Map<ExpressionContext, ExpressionContext> hints = Map.of(dateTruncFunctionExpr, timestampIndexColumn); ExpressionContext newEqualsExpression = InstancePlanMakerImplV2.overrideWithExpressionHints(equalsExpression, _indexSegment, hints); assertEquals(newEqualsExpression.getFunction().getFunctionName(), "equals"); @@ -206,14 +144,13 @@ public class QueryOverrideWithHintsTest { public void testNotOverrideWithExpressionOverrideHints() { ExpressionContext dateTruncFunctionExpr = ExpressionContext.forFunction( new FunctionContext(FunctionContext.Type.TRANSFORM, "dateTrunc", new ArrayList<>( - ImmutableList.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"), + List.of(ExpressionContext.forLiteral(FieldSpec.DataType.STRING, "DAY"), ExpressionContext.forIdentifier("ts"))))); ExpressionContext timestampIndexColumn = ExpressionContext.forIdentifier("$ts$DAY"); ExpressionContext equalsExpression = ExpressionContext.forFunction( new FunctionContext(FunctionContext.Type.TRANSFORM, "EQUALS", new ArrayList<>( - ImmutableList.of(dateTruncFunctionExpr, - ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000))))); - Map<ExpressionContext, ExpressionContext> hints = ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn); + List.of(dateTruncFunctionExpr, ExpressionContext.forLiteral(FieldSpec.DataType.INT, 1000))))); + Map<ExpressionContext, ExpressionContext> hints = Map.of(dateTruncFunctionExpr, timestampIndexColumn); ExpressionContext newEqualsExpression = InstancePlanMakerImplV2.overrideWithExpressionHints(equalsExpression, _indexSegment, hints); assertEquals(newEqualsExpression.getFunction().getFunctionName(), "equals"); @@ -231,7 +168,7 @@ public class QueryOverrideWithHintsTest { RequestUtils.getFunctionExpression("datetrunc", RequestUtils.getLiteralExpression("MONTH"), RequestUtils.getIdentifierExpression("ts")); Expression timestampIndexColumn = RequestUtils.getIdentifierExpression("$ts$MONTH"); - pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn)); + pinotQuery.setExpressionOverrideHints(Map.of(dateTruncFunctionExpr, timestampIndexColumn)); QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext, _indexSegment); assertEquals(queryContext.getSelectExpressions().get(0).getIdentifier(), "$ts$MONTH"); @@ -246,7 +183,7 @@ public class QueryOverrideWithHintsTest { RequestUtils.getFunctionExpression("datetrunc", RequestUtils.getLiteralExpression("DAY"), RequestUtils.getIdentifierExpression("ts")); Expression timestampIndexColumn = RequestUtils.getIdentifierExpression("$ts$DAY"); - pinotQuery.setExpressionOverrideHints(ImmutableMap.of(dateTruncFunctionExpr, timestampIndexColumn)); + pinotQuery.setExpressionOverrideHints(Map.of(dateTruncFunctionExpr, timestampIndexColumn)); QueryContext queryContext = QueryContextConverterUtils.getQueryContext(pinotQuery); InstancePlanMakerImplV2.rewriteQueryContextWithHints(queryContext, _indexSegment); assertEquals(queryContext.getSelectExpressions().get(0).getFunction(), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java index c4374829c3..b66edb8a0b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.segment.local.indexsegment.immutable; -import com.google.common.base.Preconditions; import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -58,14 +57,6 @@ public class EmptyIndexSegment implements ImmutableSegment { return _segmentMetadata; } - @Override - public DataSource getDataSource(String column) { - ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column); - Preconditions.checkNotNull(columnMetadata, - "ColumnMetadata for " + column + " should not be null. " + "Potentially invalid column name specified."); - return new EmptyDataSource(columnMetadata); - } - @Override public Set<String> getColumnNames() { return _segmentMetadata.getSchema().getColumnNames(); @@ -84,6 +75,14 @@ public class EmptyIndexSegment implements ImmutableSegment { public void destroy() { } + @Nullable + @Override + public DataSource getDataSourceNullable(String column) { + ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column); + return columnMetadata != null ? new EmptyDataSource(columnMetadata) : null; + } + + @Nullable @Override public List<StarTreeV2> getStarTrees() { return null; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java index e5183f4479..28ea561e8a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java @@ -234,14 +234,6 @@ public class ImmutableSegmentImpl implements ImmutableSegment { return _segmentMetadata; } - @Override - public DataSource getDataSource(String column) { - DataSource result = _dataSources.get(column); - Preconditions.checkNotNull(result, - "DataSource for %s should not be null. Potentially invalid column name specified.", column); - return result; - } - @Override public Set<String> getColumnNames() { return _segmentMetadata.getSchema().getColumnNames(); @@ -306,6 +298,13 @@ public class ImmutableSegmentImpl implements ImmutableSegment { } } + @Nullable + @Override + public DataSource getDataSourceNullable(String column) { + return _dataSources.get(column); + } + + @Nullable @Override public List<StarTreeV2> getStarTrees() { return _starTreeIndexContainer != null ? _starTreeIndexContainer.getStarTrees() : null; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 17bf891010..ae1ffae0c3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -1048,24 +1048,27 @@ public class MutableSegmentImpl implements MutableSegment { return physicalColumnNames; } + @Nullable @Override - public DataSource getDataSource(String column) { + public DataSource getDataSourceNullable(String column) { IndexContainer indexContainer = _indexContainerMap.get(column); if (indexContainer != null) { // Physical column return indexContainer.toDataSource(); - } else { + } + FieldSpec fieldSpec = _schema.getFieldSpecFor(column); + if (fieldSpec != null && fieldSpec.isVirtualColumn()) { // Virtual column - FieldSpec fieldSpec = _schema.getFieldSpecFor(column); - Preconditions.checkState(fieldSpec != null && fieldSpec.isVirtualColumn(), "Failed to find column: %s", column); // TODO: Refactor virtual column provider to directly generate data source VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _numDocsIndexed); VirtualColumnProvider virtualColumnProvider = VirtualColumnProviderFactory.buildProvider(virtualColumnContext); return new ImmutableDataSource(virtualColumnProvider.buildMetadata(virtualColumnContext), virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext)); } + return null; } + @Nullable @Override public List<StarTreeV2> getStarTrees() { return null; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java index 53f7b99558..4fdf1d6fab 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.spi; +import com.google.common.base.Preconditions; import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -25,6 +26,7 @@ import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.spi.annotations.InterfaceAudience; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -59,17 +61,30 @@ public interface IndexSegment { */ Set<String> getPhysicalColumnNames(); - /** - * Returns the {@link DataSource} for the given column. - * - * @param columnName Column name - * @return Data source for the given column - */ - DataSource getDataSource(String columnName); + /// Returns the [DataSource] for the given column. + /// TODO: Revisit all usage of this method to support virtual [DataSource]. + default DataSource getDataSource(String column) { + DataSource dataSource = getDataSourceNullable(column); + Preconditions.checkState(dataSource != null, "Failed to find data source for column: ", column); + return dataSource; + } + + /// Returns the [DataSource] for the given column, or `null` if the column does not exist in the segment. + @Nullable + DataSource getDataSourceNullable(String column); + + /// Returns the [DataSource] for the given column, or creates a virtual one if it doesn't exist. The passed in + /// [Schema] should be the latest schema of the table, not the one from [SegmentMetadata], and should contain the + /// asked column. + /// TODO: Add support for virtual [DataSource]. + default DataSource getDataSource(String column, Schema schema) { + return getDataSource(column); + } /** * Returns a list of star-trees (V2), or null if there is no star-tree (V2) in the segment. */ + @Nullable List<StarTreeV2> getStarTrees(); /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org