This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ecba1cc262 Support for Virtual DataSource (#15350) ecba1cc262 is described below commit ecba1cc2621bad58e59b3ff1b1c2f2fbc79c5b53 Author: Praveen <praveenkchagan...@gmail.com> AuthorDate: Fri Jun 13 15:37:51 2025 -0700 Support for Virtual DataSource (#15350) --- .../query/pruner/BloomFilterSegmentPruner.java | 48 +++--- .../query/pruner/ColumnValueSegmentPruner.java | 30 ++-- .../core/query/pruner/SegmentPrunerService.java | 19 +-- .../query/pruner/SelectionQuerySegmentPruner.java | 3 +- .../core/query/pruner/ValueBasedSegmentPruner.java | 19 ++- .../query/pruner/BloomFilterSegmentPrunerTest.java | 4 +- .../query/pruner/ColumnValueSegmentPrunerTest.java | 9 +- .../query/pruner/SegmentPrunerServiceTest.java | 20 --- .../pruner/SelectionQuerySegmentPrunerTest.java | 12 +- .../tests/OfflineClusterIntegrationTest.java | 168 +++++++++++++++++++++ .../integration/tests/custom/MapTypeTest.java | 4 +- .../IndexSegmentUtils.java} | 33 ++-- .../indexsegment/immutable/EmptyIndexSegment.java | 17 ++- .../immutable/ImmutableSegmentImpl.java | 16 ++ .../indexsegment/mutable/MutableSegmentImpl.java | 20 ++- .../DefaultNullValueVirtualColumnProvider.java | 12 +- .../segment/index/datasource/EmptyDataSource.java | 9 +- .../virtualcolumn/DocIdVirtualColumnProvider.java | 23 +-- .../virtualcolumn/VirtualColumnProvider.java | 14 +- .../org/apache/pinot/segment/spi/IndexSegment.java | 9 +- 20 files changed, 349 insertions(+), 140 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java index 6277aac8eb..8fa24b2740 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPruner.java @@ -49,7 +49,6 @@ import org.apache.pinot.spi.exception.QueryCancelledException; * The {@code BloomFilterSegmentPruner} prunes segments based on bloom filter for EQUALITY filter. Because the access * to bloom filter data is required, segment pruning is done in parallel when the number of segments is large. */ -@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"}) public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { // Try to schedule 10 segments for each thread, or evenly distribute them to all MAX_NUM_THREADS_PER_QUERY threads. // TODO: make this threshold configurable? threshold 10 is also used in CombinePlanNode, which accesses the @@ -77,6 +76,7 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { if (predicateType == Predicate.Type.IN) { List<String> values = ((InPredicate) predicate).getValues(); // Skip pruning when there are too many values in the IN predicate + //noinspection RedundantIfStatement if (values.size() <= _inPredicateThreshold) { return true; } @@ -101,7 +101,7 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { for (int i = 0; i < numSegments; i++) { dataSourceCache.clear(); IndexSegment segment = segments.get(i); - if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter, dataSourceCache, cachedValues)) { + if (!pruneSegmentWithFetchContext(segment, fetchContexts[i], filter, dataSourceCache, cachedValues, query)) { selectedSegments.add(segment); } } @@ -131,20 +131,20 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { fetchContexts -> pruneInParallel(numTasks, segments, query, executorService, fetchContexts)); } - private List<IndexSegment> pruneInParallel(int numTasks, List<IndexSegment> segments, QueryContext queryContext, - ExecutorService executorService, FetchContext[] fetchContexts) { + private List<IndexSegment> pruneInParallel(int numTasks, List<IndexSegment> segments, QueryContext query, + ExecutorService executorService, @Nullable FetchContext[] fetchContexts) { int numSegments = segments.size(); List<IndexSegment> allSelectedSegments = new ArrayList<>(); QueryMultiThreadingUtils.runTasksWithDeadline(numTasks, index -> { - FilterContext filter = Objects.requireNonNull(queryContext.getFilter()); + FilterContext filter = Objects.requireNonNull(query.getFilter()); ValueCache cachedValues = new ValueCache(); Map<String, DataSource> dataSourceCache = new HashMap<>(); List<IndexSegment> selectedSegments = new ArrayList<>(); for (int i = index; i < numSegments; i += numTasks) { dataSourceCache.clear(); IndexSegment segment = segments.get(i); - FetchContext fetchContext = fetchContexts == null ? null : fetchContexts[i]; - if (!pruneSegmentWithFetchContext(segment, fetchContext, filter, dataSourceCache, cachedValues)) { + FetchContext fetchContext = fetchContexts != null ? fetchContexts[i] : null; + if (!pruneSegmentWithFetchContext(segment, fetchContext, filter, dataSourceCache, cachedValues, query)) { selectedSegments.add(segment); } } @@ -158,7 +158,7 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { throw new QueryCancelledException("Cancelled while running BloomFilterSegmentPruner", e); } throw new RuntimeException("Caught exception while running BloomFilterSegmentPruner", e); - }, executorService, queryContext.getEndTimeMs()); + }, executorService, query.getEndTimeMs()); return allSelectedSegments; } @@ -188,14 +188,14 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { } } - private boolean pruneSegmentWithFetchContext(IndexSegment segment, FetchContext fetchContext, FilterContext filter, - Map<String, DataSource> dataSourceCache, ValueCache cachedValues) { + private boolean pruneSegmentWithFetchContext(IndexSegment segment, @Nullable FetchContext fetchContext, + FilterContext filter, Map<String, DataSource> dataSourceCache, ValueCache cachedValues, QueryContext query) { if (fetchContext == null) { - return pruneSegment(segment, filter, dataSourceCache, cachedValues); + return pruneSegment(segment, filter, dataSourceCache, cachedValues, query); } + segment.acquire(fetchContext); try { - segment.acquire(fetchContext); - return pruneSegment(segment, filter, dataSourceCache, cachedValues); + return pruneSegment(segment, filter, dataSourceCache, cachedValues, query); } finally { segment.release(fetchContext); } @@ -203,7 +203,7 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { @Override boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate, Map<String, DataSource> dataSourceCache, - ValueCache cachedValues) { + ValueCache cachedValues, QueryContext query) { Predicate.Type predicateType = predicate.getType(); if (predicateType == Predicate.Type.EQ) { return pruneEqPredicate(segment, (EqPredicate) predicate, dataSourceCache, cachedValues); @@ -220,10 +220,12 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { private boolean pruneEqPredicate(IndexSegment segment, EqPredicate eqPredicate, Map<String, DataSource> dataSourceCache, ValueCache valueCache) { String column = eqPredicate.getLhs().getIdentifier(); - DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column) - : dataSourceCache.computeIfAbsent(column, segment::getDataSource); - // NOTE: Column must exist after DataSchemaSegmentPruner - assert dataSource != null; + DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSourceNullable(column) + : dataSourceCache.computeIfAbsent(column, segment::getDataSourceNullable); + if (dataSource == null) { + // Column does not exist, cannot prune + return false; + } DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate, dataSourceMetadata.getDataType()); // Check bloom filter @@ -243,10 +245,12 @@ public class BloomFilterSegmentPruner extends ValueBasedSegmentPruner { return false; } String column = inPredicate.getLhs().getIdentifier(); - DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column) - : dataSourceCache.computeIfAbsent(column, segment::getDataSource); - // NOTE: Column must exist after DataSchemaSegmentPruner - assert dataSource != null; + DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSourceNullable(column) + : dataSourceCache.computeIfAbsent(column, segment::getDataSourceNullable); + if (dataSource == null) { + // Column does not exist, cannot prune + return false; + } DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate, dataSourceMetadata.getDataType()); // Check bloom filter diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java index 8eead9f552..7b5a4b507b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java @@ -26,6 +26,7 @@ import org.apache.pinot.common.request.context.predicate.EqPredicate; import org.apache.pinot.common.request.context.predicate.InPredicate; import org.apache.pinot.common.request.context.predicate.Predicate; import org.apache.pinot.common.request.context.predicate.RangePredicate; +import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.datasource.DataSource; @@ -78,14 +79,14 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner { @Override boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate, Map<String, DataSource> dataSourceCache, - ValueCache cachedValues) { + ValueCache cachedValues, QueryContext query) { Predicate.Type predicateType = predicate.getType(); if (predicateType == Predicate.Type.EQ) { - return pruneEqPredicate(segment, (EqPredicate) predicate, dataSourceCache, cachedValues); + return pruneEqPredicate(segment, (EqPredicate) predicate, dataSourceCache, cachedValues, query); } else if (predicateType == Predicate.Type.IN) { - return pruneInPredicate(segment, (InPredicate) predicate, dataSourceCache, cachedValues); + return pruneInPredicate(segment, (InPredicate) predicate, dataSourceCache, cachedValues, query); } else if (predicateType == Predicate.Type.RANGE) { - return pruneRangePredicate(segment, (RangePredicate) predicate, dataSourceCache); + return pruneRangePredicate(segment, (RangePredicate) predicate, dataSourceCache, query); } else { return false; } @@ -99,11 +100,10 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner { * </ul> */ private boolean pruneEqPredicate(IndexSegment segment, EqPredicate eqPredicate, - Map<String, DataSource> dataSourceCache, ValueCache valueCache) { + Map<String, DataSource> dataSourceCache, ValueCache valueCache, QueryContext query) { String column = eqPredicate.getLhs().getIdentifier(); - DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column) - : dataSourceCache.computeIfAbsent(column, segment::getDataSource); - // NOTE: Column must exist after DataSchemaSegmentPruner + DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column, query.getSchema()) + : dataSourceCache.computeIfAbsent(column, col -> segment.getDataSource(column, query.getSchema())); assert dataSource != null; DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); ValueCache.CachedValue cachedValue = valueCache.get(eqPredicate, dataSourceMetadata.getDataType()); @@ -131,16 +131,15 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner { * <p>NOTE: segments will not be pruned if the number of values is greater than the threshold. */ private boolean pruneInPredicate(IndexSegment segment, InPredicate inPredicate, - Map<String, DataSource> dataSourceCache, ValueCache valueCache) { + Map<String, DataSource> dataSourceCache, ValueCache valueCache, QueryContext query) { List<String> values = inPredicate.getValues(); // Skip pruning when there are too many values in the IN predicate if (values.size() > _inPredicateThreshold) { return false; } String column = inPredicate.getLhs().getIdentifier(); - DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column) - : dataSourceCache.computeIfAbsent(column, segment::getDataSource); - // NOTE: Column must exist after DataSchemaSegmentPruner + DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column, query.getSchema()) + : dataSourceCache.computeIfAbsent(column, col -> segment.getDataSource(column, query.getSchema())); assert dataSource != null; DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); List<ValueCache.CachedValue> cachedValues = valueCache.get(inPredicate, dataSourceMetadata.getDataType()); @@ -160,11 +159,10 @@ public class ColumnValueSegmentPruner extends ValueBasedSegmentPruner { * </ul> */ private boolean pruneRangePredicate(IndexSegment segment, RangePredicate rangePredicate, - Map<String, DataSource> dataSourceCache) { + Map<String, DataSource> dataSourceCache, QueryContext query) { String column = rangePredicate.getLhs().getIdentifier(); - DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column) - : dataSourceCache.computeIfAbsent(column, segment::getDataSource); - // NOTE: Column must exist after DataSchemaSegmentPruner + DataSource dataSource = segment instanceof ImmutableSegment ? segment.getDataSource(column, query.getSchema()) + : dataSourceCache.computeIfAbsent(column, col -> segment.getDataSource(column, query.getSchema())); assert dataSource != null; DataSourceMetadata dataSourceMetadata = dataSource.getDataSourceMetadata(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java index 4b775f67bd..a426cb7abe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java @@ -105,7 +105,7 @@ public class SegmentPrunerService { public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query, SegmentPrunerStatistics stats, @Nullable ExecutorService executorService) { try (InvocationScope scope = Tracing.getTracer().createScope(SegmentPrunerService.class)) { - segments = removeInvalidSegments(segments, query, stats); + segments = removeEmptySegments(segments); int invokedPrunersCount = 0; for (SegmentPruner segmentPruner : _segmentPruners) { if (segmentPruner.isApplicableTo(query)) { @@ -124,7 +124,7 @@ public class SegmentPrunerService { } /** - * Filters the given list, returning a list that only contains the valid segments, modifying the list received as + * Filters the given list, returning a list that only contains the non-empty segments, modifying the list received as * argument. * * <p> @@ -136,28 +136,17 @@ public class SegmentPrunerService { * undefined way. Therefore, this list should not be used after calling this method. * @return the new list with filtered elements. This is the list that have to be used. */ - private static List<IndexSegment> removeInvalidSegments(List<IndexSegment> segments, QueryContext query, - SegmentPrunerStatistics stats) { + private static List<IndexSegment> removeEmptySegments(List<IndexSegment> segments) { int selected = 0; - int invalid = 0; for (IndexSegment segment : segments) { if (!isEmptySegment(segment)) { - if (isInvalidSegment(segment, query)) { - invalid++; - } else { - segments.set(selected++, segment); - } + segments.set(selected++, segment); } } - stats.setInvalidSegments(invalid); return segments.subList(0, selected); } private static boolean isEmptySegment(IndexSegment segment) { return segment.getSegmentMetadata().getTotalDocs() == 0; } - - private static boolean isInvalidSegment(IndexSegment segment, QueryContext query) { - return !segment.getColumnNames().containsAll(query.getColumns()); - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java index 119f128fbb..6f2e29a224 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java @@ -126,7 +126,8 @@ public class SelectionQuerySegmentPruner implements SegmentPruner { List<MinMaxValue> minMaxValues = new ArrayList<>(numSegments); for (int i = 0; i < numSegments; i++) { IndexSegment segment = segments.get(i); - DataSourceMetadata dataSourceMetadata = segment.getDataSource(firstOrderByColumn).getDataSourceMetadata(); + DataSourceMetadata dataSourceMetadata = + segment.getDataSource(firstOrderByColumn, query.getSchema()).getDataSourceMetadata(); Comparable minValue = dataSourceMetadata.getMinValue(); Comparable maxValue = dataSourceMetadata.getMaxValue(); // Always keep the segment if it does not have column min/max value in the metadata diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java index 2e3214c8c2..14dd4e4764 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValueBasedSegmentPruner.java @@ -43,7 +43,7 @@ import org.apache.pinot.spi.utils.CommonConstants.Server; /** * The {@code ValueBasedSegmentPruner} prunes segments based on values inside the filter and segment metadata and data. */ -@SuppressWarnings({"rawtypes", "unchecked", "RedundantIfStatement"}) +@SuppressWarnings({"rawtypes", "unchecked"}) abstract public class ValueBasedSegmentPruner implements SegmentPruner { public static final String IN_PREDICATE_THRESHOLD = "inpredicate.threshold"; protected int _inPredicateThreshold; @@ -72,6 +72,7 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner { private boolean isApplicableToFilter(FilterContext filter) { switch (filter.getType()) { case AND: + assert filter.getChildren() != null; for (FilterContext child : filter.getChildren()) { if (isApplicableToFilter(child)) { return true; @@ -79,6 +80,7 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner { } return false; case OR: + assert filter.getChildren() != null; for (FilterContext child : filter.getChildren()) { if (!isApplicableToFilter(child)) { return false; @@ -108,7 +110,7 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner { List<IndexSegment> selectedSegments = new ArrayList<>(segments.size()); for (IndexSegment segment : segments) { dataSourceCache.clear(); - if (!pruneSegment(segment, filter, dataSourceCache, cachedValues)) { + if (!pruneSegment(segment, filter, dataSourceCache, cachedValues, query)) { selectedSegments.add(segment); } } @@ -116,18 +118,20 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner { } protected boolean pruneSegment(IndexSegment segment, FilterContext filter, Map<String, DataSource> dataSourceCache, - ValueCache cachedValues) { + ValueCache cachedValues, QueryContext query) { switch (filter.getType()) { case AND: + assert filter.getChildren() != null; for (FilterContext child : filter.getChildren()) { - if (pruneSegment(segment, child, dataSourceCache, cachedValues)) { + if (pruneSegment(segment, child, dataSourceCache, cachedValues, query)) { return true; } } return false; case OR: + assert filter.getChildren() != null; for (FilterContext child : filter.getChildren()) { - if (!pruneSegment(segment, child, dataSourceCache, cachedValues)) { + if (!pruneSegment(segment, child, dataSourceCache, cachedValues, query)) { return false; } } @@ -137,18 +141,19 @@ abstract public class ValueBasedSegmentPruner implements SegmentPruner { return false; case PREDICATE: Predicate predicate = filter.getPredicate(); + assert predicate != null; // Only prune columns if (predicate.getLhs().getType() != ExpressionContext.Type.IDENTIFIER) { return false; } - return pruneSegmentWithPredicate(segment, predicate, dataSourceCache, cachedValues); + return pruneSegmentWithPredicate(segment, predicate, dataSourceCache, cachedValues, query); default: throw new IllegalStateException(); } } abstract boolean pruneSegmentWithPredicate(IndexSegment segment, Predicate predicate, - Map<String, DataSource> dataSourceCache, ValueCache cachedValues); + Map<String, DataSource> dataSourceCache, ValueCache cachedValues, QueryContext query); protected static Comparable convertValue(String stringValue, DataType dataType) { try { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java index 75b2beea0f..93b6552676 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/BloomFilterSegmentPrunerTest.java @@ -101,7 +101,7 @@ public class BloomFilterSegmentPrunerTest { throws IOException { IndexSegment indexSegment = mockIndexSegment(new String[]{"1.0", "2.0", "3.0", "5.0", "7.0", "21.0"}); DataSource dataSource = mock(DataSource.class); - when(indexSegment.getDataSource("column")).thenReturn(dataSource); + when(indexSegment.getDataSourceNullable("column")).thenReturn(dataSource); runPruner(Collections.singletonList(indexSegment), "SELECT COUNT(*) FROM testTable WHERE column = 5.0 OR column = 0.0", 1); } @@ -169,7 +169,7 @@ public class BloomFilterSegmentPrunerTest { when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata); DataSource dataSource = mock(DataSource.class); - when(indexSegment.getDataSource("column")).thenReturn(dataSource); + when(indexSegment.getDataSourceNullable("column")).thenReturn(dataSource); // Add support for bloom filter DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class); BloomFilterReaderBuilder builder = new BloomFilterReaderBuilder(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java index 5a5d7bd505..ffb74cec8c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java @@ -31,10 +31,13 @@ import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; import org.apache.pinot.segment.spi.partition.PartitionFunctionFactory; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertFalse; @@ -58,8 +61,7 @@ public class ColumnValueSegmentPrunerTest { IndexSegment indexSegment = mockIndexSegment(); DataSource dataSource = mock(DataSource.class); - when(indexSegment.getDataSource("column")).thenReturn(dataSource); - + when(indexSegment.getDataSource(eq("column"), any(Schema.class))).thenReturn(dataSource); DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class); when(dataSourceMetadata.getDataType()).thenReturn(DataType.INT); when(dataSourceMetadata.getMinValue()).thenReturn(10); @@ -115,7 +117,7 @@ public class ColumnValueSegmentPrunerTest { IndexSegment indexSegment = mockIndexSegment(); DataSource dataSource = mock(DataSource.class); - when(indexSegment.getDataSource("column")).thenReturn(dataSource); + when(indexSegment.getDataSource(eq("column"), any(Schema.class))).thenReturn(dataSource); DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class); when(dataSourceMetadata.getDataType()).thenReturn(DataType.INT); @@ -190,6 +192,7 @@ public class ColumnValueSegmentPrunerTest { private boolean runPruner(IndexSegment indexSegment, String query) { QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); + queryContext.setSchema(mock(Schema.class)); return PRUNER.prune(Arrays.asList(indexSegment), queryContext).isEmpty(); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java index 62da031fd4..c18c670c57 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SegmentPrunerServiceTest.java @@ -87,26 +87,6 @@ public class SegmentPrunerServiceTest { Assert.assertEquals(stats.getInvalidSegments(), 0); } - @Test - public void segmentsWithoutColumnAreInvalid() { - SegmentPrunerService service = new SegmentPrunerService(_emptyPrunerConf); - IndexSegment indexSegment = mockIndexSegment(10, "col1", "col2"); - - SegmentPrunerStatistics stats = new SegmentPrunerStatistics(); - - List<IndexSegment> indexes = new ArrayList<>(); - indexes.add(indexSegment); - - String query = "select not_present from t1"; - - QueryContext queryContext = QueryContextConverterUtils.getQueryContext(query); - - List<IndexSegment> actual = service.prune(indexes, queryContext, stats); - - Assert.assertEquals(actual, Collections.emptyList()); - Assert.assertEquals(1, stats.getInvalidSegments()); - } - private IndexSegment mockIndexSegment(int totalDocs, String... columns) { IndexSegment indexSegment = mock(IndexSegment.class); when(indexSegment.getColumnNames()).thenReturn(new HashSet<>(Arrays.asList(columns))); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java index 63afba81f9..2f65ef11af 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java @@ -29,8 +29,11 @@ import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.spi.data.Schema; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; @@ -110,9 +113,11 @@ public class SelectionQuerySegmentPrunerTest { getIndexSegment(5L, 10L, 10), // 6 getIndexSegment(15L, 30L, 15)); // 7 + Schema schema = mock(Schema.class); // Should keep segments: [null, null], [-5, 5], [0, 10] QueryContext queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable ORDER BY testColumn LIMIT 5"); + queryContext.setSchema(schema); List<IndexSegment> result = _segmentPruner.prune(indexSegments, queryContext); assertEquals(result.size(), 3); assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5 @@ -122,6 +127,7 @@ public class SelectionQuerySegmentPrunerTest { // Should keep segments: [null, null], [-5, 5], [0, 10], [5, 10], [5, 15] queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable ORDER BY testColumn LIMIT 15, 20"); + queryContext.setSchema(schema); result = _segmentPruner.prune(indexSegments, queryContext); assertEquals(result.size(), 5); assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5 @@ -134,6 +140,7 @@ public class SelectionQuerySegmentPrunerTest { // Should keep segments: [null, null], [-5, 5], [0, 10], [5, 10], [5, 15], [15, 30], [15, 50] queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable ORDER BY testColumn, foo LIMIT 40"); + queryContext.setSchema(schema); result = _segmentPruner.prune(indexSegments, queryContext); assertEquals(result.size(), 7); assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5 @@ -148,6 +155,7 @@ public class SelectionQuerySegmentPrunerTest { // Should keep segments: [null, null], [20, 30], [15, 50], [15, 30] queryContext = QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable ORDER BY testColumn DESC LIMIT 5"); + queryContext.setSchema(schema); result = _segmentPruner.prune(indexSegments, queryContext); assertEquals(result.size(), 4); assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5 @@ -159,6 +167,7 @@ public class SelectionQuerySegmentPrunerTest { // Should keep segments: [null, null], [20, 30], [15, 50], [15, 30] queryContext = QueryContextConverterUtils .getQueryContext("SELECT * FROM testTable ORDER BY testColumn DESC LIMIT 5, 30"); + queryContext.setSchema(schema); result = _segmentPruner.prune(indexSegments, queryContext); assertEquals(result.size(), 4); assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5 @@ -170,6 +179,7 @@ public class SelectionQuerySegmentPrunerTest { // Should keep segments: [null, null], [20, 30], [15, 50], [15, 30], [5, 15], [5, 10], [0, 10], [-5, 5] queryContext = QueryContextConverterUtils .getQueryContext("SELECT * FROM testTable ORDER BY testColumn DESC, foo LIMIT 60"); + queryContext.setSchema(schema); result = _segmentPruner.prune(indexSegments, queryContext); assertEquals(result.size(), 8); assertSame(result.get(0), indexSegments.get(5)); // [null, null], 5 @@ -210,7 +220,7 @@ public class SelectionQuerySegmentPrunerTest { IndexSegment indexSegment = mock(IndexSegment.class); when(indexSegment.getColumnNames()).thenReturn(ImmutableSet.of("foo", "testColumn")); DataSource dataSource = mock(DataSource.class); - when(indexSegment.getDataSource(ORDER_BY_COLUMN)).thenReturn(dataSource); + when(indexSegment.getDataSource(eq(ORDER_BY_COLUMN), any(Schema.class))).thenReturn(dataSource); DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class); when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata); when(dataSourceMetadata.getMinValue()).thenReturn(minValue); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 1c87c1eee3..7ce615a3b1 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -33,6 +33,7 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -41,6 +42,7 @@ import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; +import java.util.stream.IntStream; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.hc.core5.http.Header; @@ -56,6 +58,7 @@ import org.apache.pinot.client.PinotDriver; import org.apache.pinot.common.exception.HttpErrorStatusException; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.response.server.TableIndexMetadataResponse; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.SimpleHttpResponse; @@ -76,6 +79,7 @@ import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; import org.apache.pinot.spi.config.table.ingestion.TransformConfig; import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.data.MetricFieldSpec; import org.apache.pinot.spi.data.Schema; @@ -4086,4 +4090,168 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet fail("Segment reload failed with exception: " + e.getMessage()); } } + + @Test + public void testVirtualColumnWithPartialReload() throws Exception { + Schema oldSchema = getSchema(DEFAULT_SCHEMA_NAME); + // Pick any existing INT column name for the “valid” cases + String validColumnName = oldSchema.getAllFieldSpecs().stream() + .filter(fs -> fs.getDataType() == DataType.INT) + .findFirst() + .get() + .getName(); + // New column name that is not in the schema + String newColumn = "newColumn"; + DataType newdataType = DataType.INT; + // Test queries + List<String> queries = List.of( + SELECT_STAR_QUERY, + SELECT_STAR_QUERY + " WHERE " + validColumnName + " > 0 LIMIT 10000", + SELECT_STAR_QUERY + " ORDER BY " + validColumnName + " LIMIT 10000", + SELECT_STAR_QUERY + " ORDER BY " + newColumn + " LIMIT 10000", + "SELECT " + newColumn + " FROM " + DEFAULT_TABLE_NAME + ); + for (String query: queries) { + try { + // Build new schema with the extra column + Schema newSchema = new Schema(); + newSchema.setSchemaName(oldSchema.getSchemaName()); + for (FieldSpec fs : oldSchema.getAllFieldSpecs()) { + newSchema.addField(fs); + } + FieldSpec newFieldSpec = new DimensionFieldSpec(newColumn, newdataType, true); + newSchema.addField(newFieldSpec); + updateSchema(newSchema); + + // Partially reload one segment + reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", + listSegments(DEFAULT_TABLE_NAME + "_OFFLINE").get(0)); + // Column should show since it would be added as a virtual column + runQueryAndAssert(query, newColumn, newFieldSpec); + // Now do a full reload and the column should still be there, indicating there is no regression + reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null); + runQueryAndAssert(query, newColumn, newFieldSpec); + } finally { + // Reset back to the original schema for the next iteration + forceUpdateSchema(oldSchema); + reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null); + } + } + } + + @Test + public void testVirtualColumnAfterReloadForDifferentDataTypes() throws Exception { + Schema oldSchema = getSchema(DEFAULT_SCHEMA_NAME); + try { + // Build a new schema: copy everything, then add one virtual column per DataType. + Schema newSchema = new Schema(); + oldSchema.getAllFieldSpecs().forEach(newSchema::addField); + // Keep insertion order – helps when debugging. + Map<String, FieldSpec> newCols = new LinkedHashMap<>(); + List<DataType> newDataTypes = List.of( + DataType.INT, DataType.LONG, DataType.FLOAT, DataType.DOUBLE, + DataType.STRING, DataType.BOOLEAN, DataType.BYTES); + for (DataType dt : newDataTypes) { + String col = "col_" + dt.name().toLowerCase(); + FieldSpec fs = new DimensionFieldSpec(col, dt, true); + newCols.put(col, fs); + newSchema.addField(fs); + } + newSchema.setSchemaName(oldSchema.getSchemaName()); + updateSchema(newSchema); + + // Reload segment. + reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", listSegments(DEFAULT_TABLE_NAME + "_OFFLINE").get(0)); + + JsonNode res = postQuery("SELECT * FROM " + DEFAULT_TABLE_NAME + " LIMIT 5000"); + assertNoError(res); + JsonNode rows = res.get("resultTable").get("rows"); + DataSchema resultSchema = + JsonUtils.jsonNodeToObject(res.get("resultTable").get("dataSchema"), DataSchema.class); + + // Verify each new column. + for (Map.Entry<String, FieldSpec> e : newCols.entrySet()) { + String col = e.getKey(); + FieldSpec fs = e.getValue(); + + int idx = IntStream.range(0, resultSchema.size()) + .filter(i -> resultSchema.getColumnName(i).equals(col)) + .findFirst() + .orElseThrow(() -> new AssertionError("Column " + col + " missing")); + + Assert.assertEquals(resultSchema.getColumnDataType(idx).name(), fs.getDataType().name(), + "Mismatch in reported type for " + col); + + for (JsonNode row : rows) { + String expectedDefault; + if (fs.getDataType() == DataType.BOOLEAN) { + // Pinot surfaces boolean default nulls as literal "false" + expectedDefault = "false"; + } else { + expectedDefault = fs.getDefaultNullValueString(); + } + Assert.assertEquals(row.get(idx).asText(), expectedDefault, "Unexpected default value for " + col); + } + } + } finally { + // Clean up the schema to the original state + forceUpdateSchema(oldSchema); + reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null); + } + } + + private void reloadAndWait(String tableNameWithType, @Nullable String segmentName) throws Exception { + String response = (segmentName != null) + ? reloadOfflineSegment(tableNameWithType, segmentName, true) + : reloadOfflineTable(tableNameWithType, true); + JsonNode responseJson = JsonUtils.stringToJsonNode(response); + String jobId; + if (segmentName != null) { + // Single segment reload response: status is a string, parse manually + String statusString = responseJson.get("status").asText(); + assertTrue(statusString.contains("SUCCESS"), "Segment reload failed: " + statusString); + int startIdx = statusString.indexOf("reload job id:") + "reload job id:".length(); + int endIdx = statusString.indexOf(',', startIdx); + jobId = statusString.substring(startIdx, endIdx).trim(); + } else { + // Full table reload response: structured JSON + JsonNode tableLevelDetails + = JsonUtils.stringToJsonNode(responseJson.get("status").asText()).get(tableNameWithType); + Assert.assertEquals(tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText(), "SUCCESS"); + jobId = tableLevelDetails.get("reloadJobId").asText(); + } + String finalJobId = jobId; + TestUtils.waitForCondition(aVoid -> { + try { + return isReloadJobCompleted(finalJobId); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 600_000L, "Reload job did not complete in 10 minutes"); + } + + private void runQueryAndAssert(String query, String newAddedColumn, FieldSpec fieldSpec) throws Exception { + JsonNode response = postQuery(query); + assertNoError(response); + JsonNode rows = response.get("resultTable").get("rows"); + JsonNode jsonSchema = response.get("resultTable").get("dataSchema"); + DataSchema resultSchema = JsonUtils.jsonNodeToObject(jsonSchema, DataSchema.class); + + assert !rows.isEmpty(); + boolean columnPresent = false; + String[] columnNames = resultSchema.getColumnNames(); + for (int columnIndex = 0; columnIndex < columnNames.length; columnIndex++) { + if (columnNames[columnIndex].equals(newAddedColumn)) { + columnPresent = true; + // Check the data type of the new column + Assert.assertEquals(resultSchema.getColumnDataType(columnIndex).name(), fieldSpec.getDataType().name()); + // Check the value of the new column + for (int rowIndex = 0; rowIndex < rows.size(); rowIndex++) { + Assert.assertEquals(rows.get(rowIndex).get(columnIndex).asText(), + String.valueOf(fieldSpec.getDefaultNullValue())); + } + } + } + Assert.assertTrue(columnPresent, "Column " + newAddedColumn + " not present in result set"); + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java index 578ee5e5ea..fdf88db410 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/MapTypeTest.java @@ -384,13 +384,13 @@ public class MapTypeTest extends CustomDataQueryClusterIntegrationTest { // Select non-existing key with proper filter query = "SELECT jsonExtractScalar(intKeyMapStr, '$.123', 'INT') FROM " + getTableName() - + " WHERE jsonExtractKey(intKeyMapStr, '$.*') = \"$['123']\""; + + " WHERE jsonExtractKey(intKeyMapStr, '$.*') = '$[''123'']'"; pinotResponse = postQuery(query); assertEquals(pinotResponse.get("exceptions").size(), 0); rows = pinotResponse.get("resultTable").get("rows"); assertEquals(rows.size(), 0); query = "SELECT jsonExtractScalar(stringKeyMapStr, '$.k3', 'INT') FROM " + getTableName() - + " WHERE jsonExtractKey(stringKeyMapStr, '$.*') = \"$['k3']\""; + + " WHERE jsonExtractKey(stringKeyMapStr, '$.*') = '$[''k3'']'"; pinotResponse = postQuery(query); assertEquals(pinotResponse.get("exceptions").size(), 0); rows = pinotResponse.get("resultTable").get("rows"); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/IndexSegmentUtils.java similarity index 54% rename from pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java rename to pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/IndexSegmentUtils.java index 8f6528fb78..d9146fe720 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/BaseVirtualColumnProvider.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/IndexSegmentUtils.java @@ -16,28 +16,29 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.segment.local.segment.index.column; +package org.apache.pinot.segment.local.indexsegment; +import org.apache.pinot.segment.local.segment.index.column.DefaultNullValueVirtualColumnProvider; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext; -import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnIndexContainer; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider; -import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; -import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; +import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory; +import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.spi.data.FieldSpec; -/** - * Shared implementation code between column providers. - */ -public abstract class BaseVirtualColumnProvider implements VirtualColumnProvider { - - protected ColumnMetadataImpl.Builder getColumnMetadataBuilder(VirtualColumnContext context) { - return new ColumnMetadataImpl.Builder().setFieldSpec(context.getFieldSpec()) - .setTotalDocs(context.getTotalDocCount()); +public class IndexSegmentUtils { + private IndexSegmentUtils() { } - @Override - public ColumnIndexContainer buildColumnIndexContainer(VirtualColumnContext context) { - return new VirtualColumnIndexContainer(buildForwardIndex(context), buildInvertedIndex(context), - buildDictionary(context)); + /// Returns a virtual [DataSource] per the given [VirtualColumnContext]. + public static DataSource createVirtualDataSource(VirtualColumnContext context) { + FieldSpec fieldSpec = context.getFieldSpec(); + VirtualColumnProvider virtualColumnProvider; + if (fieldSpec.getVirtualColumnProvider() != null) { + virtualColumnProvider = VirtualColumnProviderFactory.buildProvider(context); + } else { + virtualColumnProvider = new DefaultNullValueVirtualColumnProvider(); + } + return virtualColumnProvider.buildDataSource(context); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java index b66edb8a0b..5b240e1f22 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/EmptyIndexSegment.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.local.indexsegment.immutable; +import com.google.common.base.Preconditions; import java.util.List; import java.util.Set; import javax.annotation.Nullable; @@ -33,6 +34,8 @@ import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; import org.apache.pinot.segment.spi.index.startree.StarTreeV2; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -79,7 +82,19 @@ public class EmptyIndexSegment implements ImmutableSegment { @Override public DataSource getDataSourceNullable(String column) { ColumnMetadata columnMetadata = _segmentMetadata.getColumnMetadataFor(column); - return columnMetadata != null ? new EmptyDataSource(columnMetadata) : null; + return columnMetadata != null ? new EmptyDataSource(columnMetadata.getFieldSpec()) : null; + } + + @Override + public DataSource getDataSource(String column, Schema schema) { + DataSource dataSource = getDataSourceNullable(column); + if (dataSource != null) { + return dataSource; + } + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in schema: %s", column, + schema.getSchemaName()); + return new EmptyDataSource(fieldSpec); } @Nullable diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java index 28ea561e8a..d45dc61f3f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/immutable/ImmutableSegmentImpl.java @@ -32,11 +32,13 @@ import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; +import org.apache.pinot.segment.local.indexsegment.IndexSegmentUtils; import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.index.map.ImmutableMapDataSource; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; +import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext; import org.apache.pinot.segment.local.startree.v2.store.StarTreeIndexContainer; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; import org.apache.pinot.segment.spi.ColumnMetadata; @@ -57,6 +59,7 @@ import org.apache.pinot.segment.spi.index.startree.StarTreeV2; import org.apache.pinot.segment.spi.store.SegmentDirectory; import org.apache.pinot.segment.spi.store.SegmentDirectoryPaths; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -234,6 +237,19 @@ public class ImmutableSegmentImpl implements ImmutableSegment { return _segmentMetadata; } + @Override + public DataSource getDataSource(String column, Schema schema) { + DataSource dataSource = getDataSourceNullable(column); + if (dataSource != null) { + return dataSource; + } + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in schema: %s", column, + schema.getSchemaName()); + return IndexSegmentUtils.createVirtualDataSource( + new VirtualColumnContext(fieldSpec, _segmentMetadata.getTotalDocs())); + } + @Override public Set<String> getColumnNames() { return _segmentMetadata.getSchema().getColumnNames(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index ae1ffae0c3..5bf363228e 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -49,6 +49,7 @@ import org.apache.pinot.segment.local.aggregator.ValueAggregator; import org.apache.pinot.segment.local.aggregator.ValueAggregatorFactory; import org.apache.pinot.segment.local.dedup.DedupRecordInfo; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; +import org.apache.pinot.segment.local.indexsegment.IndexSegmentUtils; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentConfig; import org.apache.pinot.segment.local.realtime.impl.RealtimeSegmentStatsHistory; import org.apache.pinot.segment.local.realtime.impl.dictionary.BaseOffHeapMutableDictionary; @@ -56,14 +57,12 @@ import org.apache.pinot.segment.local.realtime.impl.dictionary.SameValueMutableD import org.apache.pinot.segment.local.realtime.impl.forward.FixedByteMVMutableForwardIndex; import org.apache.pinot.segment.local.realtime.impl.forward.SameValueMutableForwardIndex; import org.apache.pinot.segment.local.realtime.impl.nullvalue.MutableNullValueVector; -import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; import org.apache.pinot.segment.local.segment.index.datasource.MutableDataSource; import org.apache.pinot.segment.local.segment.index.dictionary.DictionaryIndexType; import org.apache.pinot.segment.local.segment.index.map.MutableMapDataSource; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext; -import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProviderFactory; import org.apache.pinot.segment.local.upsert.ComparisonColumns; import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager; @@ -1059,15 +1058,24 @@ public class MutableSegmentImpl implements MutableSegment { FieldSpec fieldSpec = _schema.getFieldSpecFor(column); if (fieldSpec != null && fieldSpec.isVirtualColumn()) { // Virtual column - // TODO: Refactor virtual column provider to directly generate data source VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _numDocsIndexed); - VirtualColumnProvider virtualColumnProvider = VirtualColumnProviderFactory.buildProvider(virtualColumnContext); - return new ImmutableDataSource(virtualColumnProvider.buildMetadata(virtualColumnContext), - virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext)); + return VirtualColumnProviderFactory.buildProvider(virtualColumnContext).buildDataSource(virtualColumnContext); } return null; } + @Override + public DataSource getDataSource(String column, Schema schema) { + DataSource dataSource = getDataSourceNullable(column); + if (dataSource != null) { + return dataSource; + } + FieldSpec fieldSpec = schema.getFieldSpecFor(column); + Preconditions.checkState(fieldSpec != null, "Failed to find column: %s in schema: %s", column, + schema.getSchemaName()); + return IndexSegmentUtils.createVirtualDataSource(new VirtualColumnContext(fieldSpec, _numDocsIndexed)); + } + @Nullable @Override public List<StarTreeV2> getStarTrees() { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java index 282043693e..a9d5bc0bcc 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/column/DefaultNullValueVirtualColumnProvider.java @@ -30,6 +30,7 @@ import org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVF import org.apache.pinot.segment.local.segment.index.readers.constant.ConstantMVInvertedIndexReader; import org.apache.pinot.segment.local.segment.index.readers.constant.ConstantSortedIndexReader; import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnContext; +import org.apache.pinot.segment.local.segment.virtualcolumn.VirtualColumnProvider; import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; @@ -41,7 +42,7 @@ import org.apache.pinot.spi.utils.ByteArray; /** * Provide the default null value. */ -public class DefaultNullValueVirtualColumnProvider extends BaseVirtualColumnProvider { +public class DefaultNullValueVirtualColumnProvider implements VirtualColumnProvider { @Override public ForwardIndexReader<?> buildForwardIndex(VirtualColumnContext context) { @@ -86,8 +87,12 @@ public class DefaultNullValueVirtualColumnProvider extends BaseVirtualColumnProv @Override public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) { - ColumnMetadataImpl.Builder builder = getColumnMetadataBuilder(context).setCardinality(1).setHasDictionary(true); - if (context.getFieldSpec().isSingleValueField()) { + FieldSpec fieldSpec = context.getFieldSpec(); + ColumnMetadataImpl.Builder builder = new ColumnMetadataImpl.Builder().setFieldSpec(fieldSpec) + .setTotalDocs(context.getTotalDocCount()) + .setCardinality(1) + .setHasDictionary(true); + if (fieldSpec.isSingleValueField()) { builder.setSorted(true); } else { // When there is no value for a multi-value column, the maxNumberOfMultiValues and cardinality should be @@ -96,7 +101,6 @@ public class DefaultNullValueVirtualColumnProvider extends BaseVirtualColumnProv builder.setMaxNumberOfMultiValues(1); } - FieldSpec fieldSpec = context.getFieldSpec(); Object defaultNullValue = fieldSpec.getDefaultNullValue(); switch (fieldSpec.getDataType().getStoredType()) { case INT: diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java index 46d8b64046..44c885605f 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/datasource/EmptyDataSource.java @@ -20,7 +20,6 @@ package org.apache.pinot.segment.local.segment.index.datasource; import java.util.Set; import javax.annotation.Nullable; -import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.datasource.DataSourceMetadata; import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; import org.apache.pinot.segment.spi.partition.PartitionFunction; @@ -32,15 +31,15 @@ import org.apache.pinot.spi.data.FieldSpec; */ public class EmptyDataSource extends BaseDataSource { - public EmptyDataSource(ColumnMetadata columnMetadata) { - super(new EmptyDataSourceMetadata(columnMetadata), ColumnIndexContainer.Empty.INSTANCE); + public EmptyDataSource(FieldSpec fieldSpec) { + super(new EmptyDataSourceMetadata(fieldSpec), ColumnIndexContainer.Empty.INSTANCE); } private static class EmptyDataSourceMetadata implements DataSourceMetadata { final FieldSpec _fieldSpec; - EmptyDataSourceMetadata(ColumnMetadata columnMetadata) { - _fieldSpec = columnMetadata.getFieldSpec(); + EmptyDataSourceMetadata(FieldSpec fieldSpec) { + _fieldSpec = fieldSpec; } @Override diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java index a1770ef05f..9cf98952b6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/DocIdVirtualColumnProvider.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.segment.local.segment.virtualcolumn; -import java.io.IOException; -import org.apache.pinot.segment.local.segment.index.column.BaseVirtualColumnProvider; import org.apache.pinot.segment.local.segment.index.readers.DocIdDictionary; import org.apache.pinot.segment.spi.index.metadata.ColumnMetadataImpl; import org.apache.pinot.segment.spi.index.reader.Dictionary; @@ -33,7 +31,7 @@ import org.apache.pinot.spi.utils.Pairs; /** * Virtual column provider that returns the document id. */ -public class DocIdVirtualColumnProvider extends BaseVirtualColumnProvider { +public class DocIdVirtualColumnProvider implements VirtualColumnProvider { private static final DocIdSortedIndexReader DOC_ID_SORTED_INDEX_READER = new DocIdSortedIndexReader(); @Override @@ -47,15 +45,19 @@ public class DocIdVirtualColumnProvider extends BaseVirtualColumnProvider { } @Override - public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) { - ColumnMetadataImpl.Builder columnMetadataBuilder = super.getColumnMetadataBuilder(context); - columnMetadataBuilder.setCardinality(context.getTotalDocCount()).setSorted(true).setHasDictionary(true); - return columnMetadataBuilder.build(); + public InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext context) { + return DOC_ID_SORTED_INDEX_READER; } @Override - public InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext context) { - return DOC_ID_SORTED_INDEX_READER; + public ColumnMetadataImpl buildMetadata(VirtualColumnContext context) { + int numDocs = context.getTotalDocCount(); + return new ColumnMetadataImpl.Builder().setFieldSpec(context.getFieldSpec()) + .setTotalDocs(numDocs) + .setCardinality(numDocs) + .setSorted(true) + .setHasDictionary(true) + .build(); } private static class DocIdSortedIndexReader implements SortedIndexReader<ForwardIndexReaderContext> { @@ -76,8 +78,7 @@ public class DocIdVirtualColumnProvider extends BaseVirtualColumnProvider { } @Override - public void close() - throws IOException { + public void close() { } } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java index ee30e08184..043e50057c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProvider.java @@ -18,7 +18,9 @@ */ package org.apache.pinot.segment.local.segment.virtualcolumn; +import org.apache.pinot.segment.local.segment.index.datasource.ImmutableDataSource; import org.apache.pinot.segment.spi.ColumnMetadata; +import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; @@ -30,13 +32,21 @@ import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; * comprise a proper column. */ public interface VirtualColumnProvider { + ForwardIndexReader<?> buildForwardIndex(VirtualColumnContext context); Dictionary buildDictionary(VirtualColumnContext context); + InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext context); + ColumnMetadata buildMetadata(VirtualColumnContext context); - InvertedIndexReader<?> buildInvertedIndex(VirtualColumnContext context); + default ColumnIndexContainer buildColumnIndexContainer(VirtualColumnContext context) { + return new VirtualColumnIndexContainer(buildForwardIndex(context), buildInvertedIndex(context), + buildDictionary(context)); + } - ColumnIndexContainer buildColumnIndexContainer(VirtualColumnContext context); + default DataSource buildDataSource(VirtualColumnContext context) { + return new ImmutableDataSource(buildMetadata(context), buildColumnIndexContainer(context)); + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java index 4fdf1d6fab..c045dbd03d 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/IndexSegment.java @@ -62,10 +62,10 @@ public interface IndexSegment { Set<String> getPhysicalColumnNames(); /// Returns the [DataSource] for the given column. - /// TODO: Revisit all usage of this method to support virtual [DataSource]. + /// This api is used when the column is guaranteed to exist in the segment. default DataSource getDataSource(String column) { DataSource dataSource = getDataSourceNullable(column); - Preconditions.checkState(dataSource != null, "Failed to find data source for column: ", column); + Preconditions.checkState(dataSource != null, "Failed to find data source for column: %s", column); return dataSource; } @@ -76,10 +76,7 @@ public interface IndexSegment { /// Returns the [DataSource] for the given column, or creates a virtual one if it doesn't exist. The passed in /// [Schema] should be the latest schema of the table, not the one from [SegmentMetadata], and should contain the /// asked column. - /// TODO: Add support for virtual [DataSource]. - default DataSource getDataSource(String column, Schema schema) { - return getDataSource(column); - } + DataSource getDataSource(String column, Schema schema); /** * Returns a list of star-trees (V2), or null if there is no star-tree (V2) in the segment. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org