This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 81028ce Enhance star-tree to skip matching-all predicate on non-star-tree dimension (#6109) 81028ce is described below commit 81028ceedec3b07feeb383063599c37dfd9b59a5 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Oct 7 22:15:18 2020 -0700 Enhance star-tree to skip matching-all predicate on non-star-tree dimension (#6109) For #6093 Currently in order to use the star-tree index, all the predicates must be applied to the star-tree dimensions. This could limit the usage of star-tree when a predicates is not applied to the star-tree dimensions, but can be skipped because it matches all the records. This is especially common for the implicit predicate applied to time column for the hybrid tables. This PR enhances the planning phase for star-tree to skip the predicates that matches all the records, so that star-tree can be applied to the use cases described above. --- .../plan/AggregationGroupByOrderByPlanNode.java | 34 ++--- .../core/plan/AggregationGroupByPlanNode.java | 34 ++--- .../pinot/core/plan/AggregationPlanNode.java | 36 ++--- .../apache/pinot/core/startree/StarTreeUtils.java | 147 +++++++++++++++------ .../startree/operator/StarTreeFilterOperator.java | 96 +++----------- .../startree/plan/StarTreeDocIdSetPlanNode.java | 8 +- .../core/startree/plan/StarTreeFilterPlanNode.java | 11 +- .../startree/plan/StarTreeProjectionPlanNode.java | 10 +- .../startree/plan/StarTreeTransformPlanNode.java | 7 +- .../pinot/core/startree/v2/BaseStarTreeV2Test.java | 30 ++--- .../tests/OfflineClusterIntegrationTest.java | 12 ++ 11 files changed, 208 insertions(+), 217 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java index 5cb94bd..892a96d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.plan; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode; @@ -59,32 +60,21 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode { _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]); List<StarTreeV2> starTrees = indexSegment.getStarTrees(); - if (starTrees != null) { - if (!StarTreeUtils.isStarTreeDisabled(queryContext)) { - int numAggregationFunctions = _aggregationFunctions.length; - AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = - new AggregationFunctionColumnPair[numAggregationFunctions]; - boolean hasUnsupportedAggregationFunction = false; - for (int i = 0; i < numAggregationFunctions; i++) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]); - if (aggregationFunctionColumnPair != null) { - aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; - } else { - hasUnsupportedAggregationFunction = true; - break; - } - } - if (!hasUnsupportedAggregationFunction) { - FilterContext filter = queryContext.getFilter(); + if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) { + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions); + if (aggregationFunctionColumnPairs != null) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter()); + if (predicateEvaluatorsMap != null) { for (StarTreeV2 starTreeV2 : starTrees) { if (StarTreeUtils .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions, - filter)) { + predicateEvaluatorsMap.keySet())) { _transformPlanNode = null; _starTreeTransformPlanNode = - new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter, - queryContext.getDebugOptions()); + new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, + predicateEvaluatorsMap, queryContext.getDebugOptions()); return; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java index acce657..e2b3ea8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.plan; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.query.AggregationGroupByOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode; @@ -59,32 +60,21 @@ public class AggregationGroupByPlanNode implements PlanNode { _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]); List<StarTreeV2> starTrees = indexSegment.getStarTrees(); - if (starTrees != null) { - if (!StarTreeUtils.isStarTreeDisabled(queryContext)) { - int numAggregationFunctions = _aggregationFunctions.length; - AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = - new AggregationFunctionColumnPair[numAggregationFunctions]; - boolean hasUnsupportedAggregationFunction = false; - for (int i = 0; i < numAggregationFunctions; i++) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]); - if (aggregationFunctionColumnPair != null) { - aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; - } else { - hasUnsupportedAggregationFunction = true; - break; - } - } - if (!hasUnsupportedAggregationFunction) { - FilterContext filter = queryContext.getFilter(); + if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) { + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions); + if (aggregationFunctionColumnPairs != null) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter()); + if (predicateEvaluatorsMap != null) { for (StarTreeV2 starTreeV2 : starTrees) { if (StarTreeUtils .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions, - filter)) { + predicateEvaluatorsMap.keySet())) { _transformPlanNode = null; _starTreeTransformPlanNode = - new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter, - queryContext.getDebugOptions()); + new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, + predicateEvaluatorsMap, queryContext.getDebugOptions()); return; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java index 4a267a2..a224125 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.plan; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.query.AggregationOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode; @@ -50,31 +51,20 @@ public class AggregationPlanNode implements PlanNode { assert _aggregationFunctions != null; List<StarTreeV2> starTrees = indexSegment.getStarTrees(); - if (starTrees != null) { - if (!StarTreeUtils.isStarTreeDisabled(queryContext)) { - int numAggregationFunctions = _aggregationFunctions.length; - AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = - new AggregationFunctionColumnPair[numAggregationFunctions]; - boolean hasUnsupportedAggregationFunction = false; - for (int i = 0; i < numAggregationFunctions; i++) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]); - if (aggregationFunctionColumnPair != null) { - aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; - } else { - hasUnsupportedAggregationFunction = true; - break; - } - } - if (!hasUnsupportedAggregationFunction) { - FilterContext filter = queryContext.getFilter(); + if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) { + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions); + if (aggregationFunctionColumnPairs != null) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter()); + if (predicateEvaluatorsMap != null) { for (StarTreeV2 starTreeV2 : starTrees) { - if (StarTreeUtils - .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null, filter)) { + if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null, + predicateEvaluatorsMap.keySet())) { _transformPlanNode = null; _starTreeTransformPlanNode = - new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null, filter, - queryContext.getDebugOptions()); + new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null, + predicateEvaluatorsMap, queryContext.getDebugOptions()); return; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java index ed98e98..c7340b2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java @@ -20,18 +20,30 @@ package org.apache.pinot.core.startree; import java.io.File; import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import javax.annotation.Nullable; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.predicate.Predicate; import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.core.segment.store.SegmentDirectoryPaths; import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair; import org.apache.pinot.core.startree.v2.StarTreeV2Constants; @@ -40,6 +52,7 @@ import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig; import org.apache.pinot.spi.env.CommonsConfigurationUtils; +@SuppressWarnings("rawtypes") public class StarTreeUtils { private StarTreeUtils() { } @@ -55,17 +68,103 @@ public class StarTreeUtils { } /** + * Extracts the {@link AggregationFunctionColumnPair}s from the given {@link AggregationFunction}s. Returns + * {@code null} if any {@link AggregationFunction} cannot be represented as an {@link AggregationFunctionColumnPair} + * (e.g. has multiple arguments, argument is not column etc.). + */ + @Nullable + public static AggregationFunctionColumnPair[] extractAggregationFunctionPairs( + AggregationFunction[] aggregationFunctions) { + int numAggregationFunctions = aggregationFunctions.length; + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + new AggregationFunctionColumnPair[numAggregationFunctions]; + for (int i = 0; i < numAggregationFunctions; i++) { + AggregationFunctionColumnPair aggregationFunctionColumnPair = + AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunctions[i]); + if (aggregationFunctionColumnPair != null) { + aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; + } else { + return null; + } + } + return aggregationFunctionColumnPairs; + } + + /** + * Extracts a map from the column to a list of {@link PredicateEvaluator}s for it. Returns {@code null} if the filter + * cannot be solved by the star-tree. + */ + @Nullable + public static Map<String, List<PredicateEvaluator>> extractPredicateEvaluatorsMap(IndexSegment indexSegment, + @Nullable FilterContext filter) { + if (filter == null) { + return Collections.emptyMap(); + } + + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = new HashMap<>(); + Queue<FilterContext> queue = new LinkedList<>(); + queue.add(filter); + FilterContext filterNode; + while ((filterNode = queue.poll()) != null) { + switch (filterNode.getType()) { + case AND: + queue.addAll(filterNode.getChildren()); + break; + case OR: + // Star-tree does not support OR filter + return null; + case PREDICATE: + Predicate predicate = filterNode.getPredicate(); + ExpressionContext lhs = predicate.getLhs(); + if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) { + // Star-tree does not support non-identifier expression + return null; + } + String column = lhs.getIdentifier(); + DataSource dataSource = indexSegment.getDataSource(column); + Dictionary dictionary = dataSource.getDictionary(); + if (dictionary == null) { + // Star-tree does not support non-dictionary encoded dimension + return null; + } + switch (predicate.getType()) { + // Do not use star-tree for the following predicates because: + // - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids + // - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids + case REGEXP_LIKE: + case TEXT_MATCH: + case IS_NULL: + case IS_NOT_NULL: + return null; + } + PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider + .getPredicateEvaluator(predicate, dictionary, dataSource.getDataSourceMetadata().getDataType()); + if (predicateEvaluator.isAlwaysFalse()) { + // Do not use star-tree if there is no matching record + return null; + } + if (!predicateEvaluator.isAlwaysTrue()) { + predicateEvaluatorsMap.computeIfAbsent(column, k -> new ArrayList<>()).add(predicateEvaluator); + } + break; + default: + throw new IllegalStateException(); + } + } + return predicateEvaluatorsMap; + } + + /** * Returns whether the query is fit for star tree index. * <p>The query is fit for star tree index if the following conditions are met: * <ul> * <li>Star-tree contains all aggregation function column pairs</li> * <li>All predicate columns and group-by columns are star-tree dimensions</li> - * <li>All predicates are conjoined by AND</li> * </ul> */ public static boolean isFitForStarTree(StarTreeV2Metadata starTreeV2Metadata, AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions, - @Nullable FilterContext filter) { + Set<String> predicateColumns) { // Check aggregations for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { if (!starTreeV2Metadata.containsFunctionColumnPair(aggregationFunctionColumnPair)) { @@ -73,8 +172,9 @@ public class StarTreeUtils { } } - // Check group-by expressions Set<String> starTreeDimensions = new HashSet<>(starTreeV2Metadata.getDimensionsSplitOrder()); + + // Check group-by expressions if (groupByExpressions != null) { Set<String> groupByColumns = new HashSet<>(); for (ExpressionContext groupByExpression : groupByExpressions) { @@ -85,45 +185,8 @@ public class StarTreeUtils { } } - // Check filters - return filter == null || checkFilters(filter, starTreeDimensions); - } - - /** - * Helper method to check whether all columns in predicates are star-tree dimensions, and all predicates are - * conjoined by AND. - */ - private static boolean checkFilters(FilterContext filter, Set<String> starTreeDimensions) { - switch (filter.getType()) { - case AND: - for (FilterContext child : filter.getChildren()) { - if (!checkFilters(child, starTreeDimensions)) { - return false; - } - } - return true; - case OR: - return false; - case PREDICATE: - Predicate predicate = filter.getPredicate(); - ExpressionContext lhs = predicate.getLhs(); - if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) { - return false; - } - switch (predicate.getType()) { - // NOTE: Do not use star-tree for the following predicates because: - // - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids - // - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids - case REGEXP_LIKE: - case TEXT_MATCH: - case IS_NULL: - case IS_NOT_NULL: - return false; - } - return starTreeDimensions.contains(lhs.getIdentifier()); - default: - throw new IllegalStateException(); - } + // Check predicate columns + return starTreeDimensions.containsAll(predicateColumns); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java index 32d0b57..ae65720 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java @@ -41,9 +41,6 @@ import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator; import org.apache.pinot.core.operator.filter.EmptyFilterOperator; import org.apache.pinot.core.operator.filter.FilterOperatorUtils; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; -import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; -import org.apache.pinot.core.query.request.context.FilterContext; -import org.apache.pinot.core.query.request.context.predicate.Predicate; import org.apache.pinot.core.startree.StarTree; import org.apache.pinot.core.startree.StarTreeNode; import org.apache.pinot.core.startree.v2.StarTreeV2; @@ -121,83 +118,29 @@ public class StarTreeFilterOperator extends BaseFilterOperator { // Star-tree private final StarTreeV2 _starTreeV2; - // Set of group-by columns - private final Set<String> _groupByColumns; // Map from column to predicate evaluators private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap; - // Map from column to matching dictionary ids - private final Map<String, IntSet> _matchingDictIdsMap; + // Set of group-by columns + private final Set<String> _groupByColumns; private final Map<String, String> _debugOptions; boolean _resultEmpty = false; - public StarTreeFilterOperator(StarTreeV2 starTreeV2, @Nullable FilterContext filter, - @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { + public StarTreeFilterOperator(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, + Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { _starTreeV2 = starTreeV2; - _groupByColumns = groupByColumns != null ? new HashSet<>(groupByColumns) : Collections.emptySet(); + _predicateEvaluatorsMap = predicateEvaluatorsMap; _debugOptions = debugOptions; - if (filter != null) { - _predicateEvaluatorsMap = new HashMap<>(); - _matchingDictIdsMap = new HashMap<>(); - - // Process the filter tree and get a map from column to a list of predicates applied to it - Map<String, List<Predicate>> predicatesMap = getPredicatesMap(filter); - - // Initialize the predicate evaluators map - for (Map.Entry<String, List<Predicate>> entry : predicatesMap.entrySet()) { - String columnName = entry.getKey(); - List<Predicate> predicates = entry.getValue(); - List<PredicateEvaluator> predicateEvaluators = new ArrayList<>(); - - DataSource dataSource = starTreeV2.getDataSource(columnName); - for (Predicate predicate : predicates) { - PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider - .getPredicateEvaluator(predicate, dataSource.getDictionary(), - dataSource.getDataSourceMetadata().getDataType()); - // If predicate is always evaluated false, the result for the filter operator will be empty, early terminate - if (predicateEvaluator.isAlwaysFalse()) { - _resultEmpty = true; - return; - } else if (!predicateEvaluator.isAlwaysTrue()) { - predicateEvaluators.add(predicateEvaluator); - } - } - if (!predicateEvaluators.isEmpty()) { - _predicateEvaluatorsMap.put(columnName, predicateEvaluators); - } - } - + if (groupByColumns != null) { + _groupByColumns = new HashSet<>(groupByColumns); // Remove columns with predicates from group-by columns because we won't use star node for that column _groupByColumns.removeAll(_predicateEvaluatorsMap.keySet()); } else { - _predicateEvaluatorsMap = Collections.emptyMap(); - _matchingDictIdsMap = Collections.emptyMap(); + _groupByColumns = Collections.emptySet(); } } - /** - * Helper method to process the filter tree and get a map from column to a list of predicates applied to it. - */ - private Map<String, List<Predicate>> getPredicatesMap(FilterContext filter) { - Map<String, List<Predicate>> predicatesMap = new HashMap<>(); - Queue<FilterContext> queue = new LinkedList<>(); - queue.add(filter); - - while (!queue.isEmpty()) { - FilterContext filterNode = queue.remove(); - if (filterNode.getType() == FilterContext.Type.AND) { - queue.addAll(filterNode.getChildren()); - } else { - Predicate predicate = filterNode.getPredicate(); - String columnName = predicate.getLhs().getIdentifier(); - predicatesMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(predicate); - } - } - - return predicatesMap; - } - @Override public FilterBlock getNextBlock() { if (_resultEmpty) { @@ -252,13 +195,14 @@ public class StarTreeFilterOperator extends BaseFilterOperator { /** * Helper method to traverse the star tree, get matching documents and keep track of all the predicate columns that - * are not matched. - * <p>Return <code>null</code> if no matching dictionary id found for a column (i.e. the result for the filter - * operator is empty). + * are not matched. Returns {@code null} if no matching dictionary id found for a column (i.e. the result for the + * filter operator is empty). */ + @Nullable private StarTreeResult traverseStarTree() { - MutableRoaringBitmap matchedDocIds = new MutableRoaringBitmap(); + MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap(); Set<String> remainingPredicateColumns = new HashSet<>(); + Map<String, IntSet> matchingDictIdsMap = new HashMap<>(); StarTree starTree = _starTreeV2.getStarTree(); List<String> dimensionNames = starTree.getDimensionNames(); @@ -267,19 +211,19 @@ public class StarTreeFilterOperator extends BaseFilterOperator { // Use BFS to traverse the star tree Queue<SearchEntry> queue = new LinkedList<>(); queue.add(new SearchEntry(starTreeRootNode, _predicateEvaluatorsMap.keySet(), _groupByColumns)); - while (!queue.isEmpty()) { - SearchEntry searchEntry = queue.remove(); + SearchEntry searchEntry; + while ((searchEntry = queue.poll()) != null) { StarTreeNode starTreeNode = searchEntry._starTreeNode; // If all predicate columns and group-by columns are matched, we can use aggregated document if (searchEntry._remainingPredicateColumns.isEmpty() && searchEntry._remainingGroupByColumns.isEmpty()) { - matchedDocIds.add(starTreeNode.getAggregatedDocId()); + matchingDocIds.add(starTreeNode.getAggregatedDocId()); } else { // For leaf node, because we haven't exhausted all predicate columns and group-by columns, we cannot use // the aggregated document. Add the range of documents for this node to the bitmap, and keep track of the // remaining predicate columns for this node if (starTreeNode.isLeaf()) { - matchedDocIds.add(starTreeNode.getStartDocId(), starTreeNode.getEndDocId()); + matchingDocIds.add((long) starTreeNode.getStartDocId(), starTreeNode.getEndDocId()); remainingPredicateColumns.addAll(searchEntry._remainingPredicateColumns); } else { // For non-leaf node, proceed to next level @@ -290,7 +234,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator { Set<String> newRemainingPredicateColumns = new HashSet<>(searchEntry._remainingPredicateColumns); newRemainingPredicateColumns.remove(nextDimension); - IntSet matchingDictIds = _matchingDictIdsMap.get(nextDimension); + IntSet matchingDictIds = matchingDictIdsMap.get(nextDimension); if (matchingDictIds == null) { matchingDictIds = getMatchingDictIds(_predicateEvaluatorsMap.get(nextDimension)); @@ -299,7 +243,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator { return null; } - _matchingDictIdsMap.put(nextDimension, matchingDictIds); + matchingDictIdsMap.put(nextDimension, matchingDictIds); } int numMatchingDictIds = matchingDictIds.size(); @@ -358,7 +302,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator { } } - return new StarTreeResult(matchedDocIds, remainingPredicateColumns); + return new StarTreeResult(matchingDocIds, remainingPredicateColumns); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java index c418d21..4fe485d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java @@ -18,22 +18,24 @@ */ package org.apache.pinot.core.startree.plan; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.core.operator.DocIdSetOperator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.core.plan.PlanNode; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.v2.StarTreeV2; public class StarTreeDocIdSetPlanNode implements PlanNode { private final StarTreeFilterPlanNode _starTreeFilterPlanNode; - public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter, + public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { - _starTreeFilterPlanNode = new StarTreeFilterPlanNode(starTreeV2, filter, groupByColumns, debugOptions); + _starTreeFilterPlanNode = + new StarTreeFilterPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java index 9628086..9c2efba 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java @@ -18,31 +18,32 @@ */ package org.apache.pinot.core.startree.plan; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.PlanNode; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.operator.StarTreeFilterOperator; import org.apache.pinot.core.startree.v2.StarTreeV2; public class StarTreeFilterPlanNode implements PlanNode { private final StarTreeV2 _starTreeV2; - private final FilterContext _filter; + private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap; private final Set<String> _groupByColumns; private final Map<String, String> _debugOptions; - public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter, + public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { _starTreeV2 = starTreeV2; - _filter = filter; + _predicateEvaluatorsMap = predicateEvaluatorsMap; _groupByColumns = groupByColumns; _debugOptions = debugOptions; } @Override public StarTreeFilterOperator run() { - return new StarTreeFilterOperator(_starTreeV2, _filter, _groupByColumns, _debugOptions); + return new StarTreeFilterOperator(_starTreeV2, _predicateEvaluatorsMap, _groupByColumns, _debugOptions); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java index e6d482d..e54c96d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.startree.plan; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.core.common.DataSource; import org.apache.pinot.core.operator.ProjectionOperator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.PlanNode; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.v2.StarTreeV2; @@ -34,13 +35,14 @@ public class StarTreeProjectionPlanNode implements PlanNode { private final StarTreeDocIdSetPlanNode _starTreeDocIdSetPlanNode; public StarTreeProjectionPlanNode(StarTreeV2 starTreeV2, Set<String> projectionColumns, - @Nullable FilterContext filter, @Nullable Set<String> groupByColumns, + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { - _dataSourceMap = new HashMap<>(projectionColumns.size()); + _dataSourceMap = new HashMap<>(); for (String projectionColumn : projectionColumns) { _dataSourceMap.put(projectionColumn, starTreeV2.getDataSource(projectionColumn)); } - _starTreeDocIdSetPlanNode = new StarTreeDocIdSetPlanNode(starTreeV2, filter, groupByColumns, debugOptions); + _starTreeDocIdSetPlanNode = + new StarTreeDocIdSetPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java index 8f50fa5..7c484cf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java @@ -25,10 +25,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair; import org.apache.pinot.core.startree.v2.StarTreeV2; @@ -39,7 +39,7 @@ public class StarTreeTransformPlanNode implements PlanNode { public StarTreeTransformPlanNode(StarTreeV2 starTreeV2, AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions, - @Nullable FilterContext filter, @Nullable Map<String, String> debugOptions) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Map<String, String> debugOptions) { Set<String> projectionColumns = new HashSet<>(); for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { projectionColumns.add(aggregationFunctionColumnPair.toColumnName()); @@ -57,7 +57,8 @@ public class StarTreeTransformPlanNode implements PlanNode { groupByColumns = null; } _starTreeProjectionPlanNode = - new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, filter, groupByColumns, debugOptions); + new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, predicateEvaluatorsMap, groupByColumns, + debugOptions); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java index 7b46277..4ea61cf 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java @@ -40,10 +40,10 @@ import org.apache.pinot.core.data.readers.GenericRowRecordReader; import org.apache.pinot.core.indexsegment.IndexSegment; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.FilterPlanNode; import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; @@ -52,6 +52,7 @@ import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext; +import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeFilterPlanNode; import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder; import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder.BuildMode; @@ -185,15 +186,11 @@ abstract class BaseStarTreeV2Test<R, A> { // Aggregations AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); - assert aggregationFunctions != null; + assertNotNull(aggregationFunctions); int numAggregations = aggregationFunctions.length; - List<AggregationFunctionColumnPair> functionColumnPairs = new ArrayList<>(numAggregations); - for (AggregationFunction aggregationFunction : aggregationFunctions) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunction); - assertNotNull(aggregationFunctionColumnPair); - functionColumnPairs.add(aggregationFunctionColumnPair); - } + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions); + assertNotNull(aggregationFunctionColumnPairs); // Group-by columns Set<String> groupByColumnSet = new HashSet<>(); @@ -208,16 +205,15 @@ abstract class BaseStarTreeV2Test<R, A> { // Filter FilterContext filter = queryContext.getFilter(); + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, filter); + assertNotNull(predicateEvaluatorsMap); // Extract values with star-tree - PlanNode starTreeFilterPlanNode; - if (groupByColumns.isEmpty()) { - starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, null, null); - } else { - starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, groupByColumnSet, null); - } + PlanNode starTreeFilterPlanNode = + new StarTreeFilterPlanNode(_starTreeV2, predicateEvaluatorsMap, groupByColumnSet, null); List<ForwardIndexReader> starTreeAggregationColumnReaders = new ArrayList<>(numAggregations); - for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) { + for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { starTreeAggregationColumnReaders .add(_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex()); } @@ -232,7 +228,7 @@ abstract class BaseStarTreeV2Test<R, A> { PlanNode nonStarTreeFilterPlanNode = new FilterPlanNode(_indexSegment, queryContext); List<ForwardIndexReader> nonStarTreeAggregationColumnReaders = new ArrayList<>(numAggregations); List<Dictionary> nonStarTreeAggregationColumnDictionaries = new ArrayList<>(numAggregations); - for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) { + for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { if (aggregationFunctionColumnPair.getFunctionType() == AggregationFunctionType.COUNT) { nonStarTreeAggregationColumnReaders.add(null); nonStarTreeAggregationColumnDictionaries.add(null); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index f2bf7dc..0d3c3b0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -477,6 +477,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs); assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Should be able to use the star-tree with an additional match-all predicate on another dimension + firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1 + " AND DaysSinceEpoch > 16070"); + assertEquals(firstQueryResponse.get("aggregationResults").get(0).get("value").asInt(), firstQueryResult); + assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs); + assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Test the second query JsonNode secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2); int secondQueryResult = secondQueryResponse.get("aggregationResults").get(0).get("value").asInt(); @@ -518,6 +524,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs); assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Should be able to use the star-tree with an additional match-all predicate on another dimension + secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2 + " AND DaysSinceEpoch > 16070"); + assertEquals(secondQueryResponse.get("aggregationResults").get(0).get("value").asInt(), secondQueryResult); + assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs); + assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Remove the star-tree index config and trigger reload indexingConfig.setStarTreeIndexConfigs(null); updateTableConfig(tableConfig); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org