This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch enhance_star_tree in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 6519f22df04d8174715530f16c101ae86841024b Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com> AuthorDate: Mon Oct 5 18:44:09 2020 -0700 Enhance star-tree to skip matching-all predicate on non-star-tree dimension --- .../plan/AggregationGroupByOrderByPlanNode.java | 34 ++--- .../core/plan/AggregationGroupByPlanNode.java | 34 ++--- .../pinot/core/plan/AggregationPlanNode.java | 36 ++--- .../apache/pinot/core/startree/StarTreeUtils.java | 147 +++++++++++++++------ .../startree/operator/StarTreeFilterOperator.java | 101 +++----------- .../startree/plan/StarTreeDocIdSetPlanNode.java | 8 +- .../core/startree/plan/StarTreeFilterPlanNode.java | 13 +- .../startree/plan/StarTreeProjectionPlanNode.java | 10 +- .../startree/plan/StarTreeTransformPlanNode.java | 9 +- .../pinot/core/startree/v2/BaseStarTreeV2Test.java | 30 ++--- .../tests/OfflineClusterIntegrationTest.java | 12 ++ 11 files changed, 210 insertions(+), 224 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java index 5cb94bd..892a96d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.plan; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode; @@ -59,32 +60,21 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode { _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]); List<StarTreeV2> starTrees = indexSegment.getStarTrees(); - if (starTrees != null) { - if (!StarTreeUtils.isStarTreeDisabled(queryContext)) { - int numAggregationFunctions = _aggregationFunctions.length; - AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = - new AggregationFunctionColumnPair[numAggregationFunctions]; - boolean hasUnsupportedAggregationFunction = false; - for (int i = 0; i < numAggregationFunctions; i++) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]); - if (aggregationFunctionColumnPair != null) { - aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; - } else { - hasUnsupportedAggregationFunction = true; - break; - } - } - if (!hasUnsupportedAggregationFunction) { - FilterContext filter = queryContext.getFilter(); + if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) { + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions); + if (aggregationFunctionColumnPairs != null) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter()); + if (predicateEvaluatorsMap != null) { for (StarTreeV2 starTreeV2 : starTrees) { if (StarTreeUtils .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions, - filter)) { + predicateEvaluatorsMap.keySet())) { _transformPlanNode = null; _starTreeTransformPlanNode = - new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter, - queryContext.getDebugOptions()); + new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, + predicateEvaluatorsMap, queryContext.getDebugOptions()); return; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java index acce657..e2b3ea8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.plan; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.query.AggregationGroupByOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode; @@ -59,32 +60,21 @@ public class AggregationGroupByPlanNode implements PlanNode { _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]); List<StarTreeV2> starTrees = indexSegment.getStarTrees(); - if (starTrees != null) { - if (!StarTreeUtils.isStarTreeDisabled(queryContext)) { - int numAggregationFunctions = _aggregationFunctions.length; - AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = - new AggregationFunctionColumnPair[numAggregationFunctions]; - boolean hasUnsupportedAggregationFunction = false; - for (int i = 0; i < numAggregationFunctions; i++) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]); - if (aggregationFunctionColumnPair != null) { - aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; - } else { - hasUnsupportedAggregationFunction = true; - break; - } - } - if (!hasUnsupportedAggregationFunction) { - FilterContext filter = queryContext.getFilter(); + if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) { + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions); + if (aggregationFunctionColumnPairs != null) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter()); + if (predicateEvaluatorsMap != null) { for (StarTreeV2 starTreeV2 : starTrees) { if (StarTreeUtils .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, _groupByExpressions, - filter)) { + predicateEvaluatorsMap.keySet())) { _transformPlanNode = null; _starTreeTransformPlanNode = - new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, filter, - queryContext.getDebugOptions()); + new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, _groupByExpressions, + predicateEvaluatorsMap, queryContext.getDebugOptions()); return; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java index 4a267a2..a224125 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.plan; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.query.AggregationOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode; @@ -50,31 +51,20 @@ public class AggregationPlanNode implements PlanNode { assert _aggregationFunctions != null; List<StarTreeV2> starTrees = indexSegment.getStarTrees(); - if (starTrees != null) { - if (!StarTreeUtils.isStarTreeDisabled(queryContext)) { - int numAggregationFunctions = _aggregationFunctions.length; - AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = - new AggregationFunctionColumnPair[numAggregationFunctions]; - boolean hasUnsupportedAggregationFunction = false; - for (int i = 0; i < numAggregationFunctions; i++) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(_aggregationFunctions[i]); - if (aggregationFunctionColumnPair != null) { - aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; - } else { - hasUnsupportedAggregationFunction = true; - break; - } - } - if (!hasUnsupportedAggregationFunction) { - FilterContext filter = queryContext.getFilter(); + if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) { + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(_aggregationFunctions); + if (aggregationFunctionColumnPairs != null) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(indexSegment, queryContext.getFilter()); + if (predicateEvaluatorsMap != null) { for (StarTreeV2 starTreeV2 : starTrees) { - if (StarTreeUtils - .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null, filter)) { + if (StarTreeUtils.isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, null, + predicateEvaluatorsMap.keySet())) { _transformPlanNode = null; _starTreeTransformPlanNode = - new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null, filter, - queryContext.getDebugOptions()); + new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, null, + predicateEvaluatorsMap, queryContext.getDebugOptions()); return; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java index ed98e98..c7340b2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java @@ -20,18 +20,30 @@ package org.apache.pinot.core.startree; import java.io.File; import java.io.FileOutputStream; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Queue; import java.util.Set; import javax.annotation.Nullable; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; +import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.predicate.Predicate; import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.core.segment.store.SegmentDirectoryPaths; import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair; import org.apache.pinot.core.startree.v2.StarTreeV2Constants; @@ -40,6 +52,7 @@ import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig; import org.apache.pinot.spi.env.CommonsConfigurationUtils; +@SuppressWarnings("rawtypes") public class StarTreeUtils { private StarTreeUtils() { } @@ -55,17 +68,103 @@ public class StarTreeUtils { } /** + * Extracts the {@link AggregationFunctionColumnPair}s from the given {@link AggregationFunction}s. Returns + * {@code null} if any {@link AggregationFunction} cannot be represented as an {@link AggregationFunctionColumnPair} + * (e.g. has multiple arguments, argument is not column etc.). + */ + @Nullable + public static AggregationFunctionColumnPair[] extractAggregationFunctionPairs( + AggregationFunction[] aggregationFunctions) { + int numAggregationFunctions = aggregationFunctions.length; + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + new AggregationFunctionColumnPair[numAggregationFunctions]; + for (int i = 0; i < numAggregationFunctions; i++) { + AggregationFunctionColumnPair aggregationFunctionColumnPair = + AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunctions[i]); + if (aggregationFunctionColumnPair != null) { + aggregationFunctionColumnPairs[i] = aggregationFunctionColumnPair; + } else { + return null; + } + } + return aggregationFunctionColumnPairs; + } + + /** + * Extracts a map from the column to a list of {@link PredicateEvaluator}s for it. Returns {@code null} if the filter + * cannot be solved by the star-tree. + */ + @Nullable + public static Map<String, List<PredicateEvaluator>> extractPredicateEvaluatorsMap(IndexSegment indexSegment, + @Nullable FilterContext filter) { + if (filter == null) { + return Collections.emptyMap(); + } + + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = new HashMap<>(); + Queue<FilterContext> queue = new LinkedList<>(); + queue.add(filter); + FilterContext filterNode; + while ((filterNode = queue.poll()) != null) { + switch (filterNode.getType()) { + case AND: + queue.addAll(filterNode.getChildren()); + break; + case OR: + // Star-tree does not support OR filter + return null; + case PREDICATE: + Predicate predicate = filterNode.getPredicate(); + ExpressionContext lhs = predicate.getLhs(); + if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) { + // Star-tree does not support non-identifier expression + return null; + } + String column = lhs.getIdentifier(); + DataSource dataSource = indexSegment.getDataSource(column); + Dictionary dictionary = dataSource.getDictionary(); + if (dictionary == null) { + // Star-tree does not support non-dictionary encoded dimension + return null; + } + switch (predicate.getType()) { + // Do not use star-tree for the following predicates because: + // - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids + // - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids + case REGEXP_LIKE: + case TEXT_MATCH: + case IS_NULL: + case IS_NOT_NULL: + return null; + } + PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider + .getPredicateEvaluator(predicate, dictionary, dataSource.getDataSourceMetadata().getDataType()); + if (predicateEvaluator.isAlwaysFalse()) { + // Do not use star-tree if there is no matching record + return null; + } + if (!predicateEvaluator.isAlwaysTrue()) { + predicateEvaluatorsMap.computeIfAbsent(column, k -> new ArrayList<>()).add(predicateEvaluator); + } + break; + default: + throw new IllegalStateException(); + } + } + return predicateEvaluatorsMap; + } + + /** * Returns whether the query is fit for star tree index. * <p>The query is fit for star tree index if the following conditions are met: * <ul> * <li>Star-tree contains all aggregation function column pairs</li> * <li>All predicate columns and group-by columns are star-tree dimensions</li> - * <li>All predicates are conjoined by AND</li> * </ul> */ public static boolean isFitForStarTree(StarTreeV2Metadata starTreeV2Metadata, AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions, - @Nullable FilterContext filter) { + Set<String> predicateColumns) { // Check aggregations for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { if (!starTreeV2Metadata.containsFunctionColumnPair(aggregationFunctionColumnPair)) { @@ -73,8 +172,9 @@ public class StarTreeUtils { } } - // Check group-by expressions Set<String> starTreeDimensions = new HashSet<>(starTreeV2Metadata.getDimensionsSplitOrder()); + + // Check group-by expressions if (groupByExpressions != null) { Set<String> groupByColumns = new HashSet<>(); for (ExpressionContext groupByExpression : groupByExpressions) { @@ -85,45 +185,8 @@ public class StarTreeUtils { } } - // Check filters - return filter == null || checkFilters(filter, starTreeDimensions); - } - - /** - * Helper method to check whether all columns in predicates are star-tree dimensions, and all predicates are - * conjoined by AND. - */ - private static boolean checkFilters(FilterContext filter, Set<String> starTreeDimensions) { - switch (filter.getType()) { - case AND: - for (FilterContext child : filter.getChildren()) { - if (!checkFilters(child, starTreeDimensions)) { - return false; - } - } - return true; - case OR: - return false; - case PREDICATE: - Predicate predicate = filter.getPredicate(); - ExpressionContext lhs = predicate.getLhs(); - if (lhs.getType() != ExpressionContext.Type.IDENTIFIER) { - return false; - } - switch (predicate.getType()) { - // NOTE: Do not use star-tree for the following predicates because: - // - REGEXP_LIKE: Need to scan the whole dictionary to gather the matching dictionary ids - // - TEXT_MATCH/IS_NULL/IS_NOT_NULL: No way to gather the matching dictionary ids - case REGEXP_LIKE: - case TEXT_MATCH: - case IS_NULL: - case IS_NOT_NULL: - return false; - } - return starTreeDimensions.contains(lhs.getIdentifier()); - default: - throw new IllegalStateException(); - } + // Check predicate columns + return starTreeDimensions.containsAll(predicateColumns); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java index 32d0b57..2037b71 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/operator/StarTreeFilterOperator.java @@ -22,7 +22,6 @@ import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import it.unimi.dsi.fastutil.ints.IntSet; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -41,9 +40,6 @@ import org.apache.pinot.core.operator.filter.BitmapBasedFilterOperator; import org.apache.pinot.core.operator.filter.EmptyFilterOperator; import org.apache.pinot.core.operator.filter.FilterOperatorUtils; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; -import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluatorProvider; -import org.apache.pinot.core.query.request.context.FilterContext; -import org.apache.pinot.core.query.request.context.predicate.Predicate; import org.apache.pinot.core.startree.StarTree; import org.apache.pinot.core.startree.StarTreeNode; import org.apache.pinot.core.startree.v2.StarTreeV2; @@ -121,81 +117,23 @@ public class StarTreeFilterOperator extends BaseFilterOperator { // Star-tree private final StarTreeV2 _starTreeV2; - // Set of group-by columns - private final Set<String> _groupByColumns; // Map from column to predicate evaluators private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap; - // Map from column to matching dictionary ids - private final Map<String, IntSet> _matchingDictIdsMap; + // Set of group-by columns + private final Set<String> _groupByColumns; private final Map<String, String> _debugOptions; boolean _resultEmpty = false; - public StarTreeFilterOperator(StarTreeV2 starTreeV2, @Nullable FilterContext filter, - @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { + public StarTreeFilterOperator(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, + Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { _starTreeV2 = starTreeV2; - _groupByColumns = groupByColumns != null ? new HashSet<>(groupByColumns) : Collections.emptySet(); + _predicateEvaluatorsMap = predicateEvaluatorsMap; + _groupByColumns = groupByColumns; _debugOptions = debugOptions; - if (filter != null) { - _predicateEvaluatorsMap = new HashMap<>(); - _matchingDictIdsMap = new HashMap<>(); - - // Process the filter tree and get a map from column to a list of predicates applied to it - Map<String, List<Predicate>> predicatesMap = getPredicatesMap(filter); - - // Initialize the predicate evaluators map - for (Map.Entry<String, List<Predicate>> entry : predicatesMap.entrySet()) { - String columnName = entry.getKey(); - List<Predicate> predicates = entry.getValue(); - List<PredicateEvaluator> predicateEvaluators = new ArrayList<>(); - - DataSource dataSource = starTreeV2.getDataSource(columnName); - for (Predicate predicate : predicates) { - PredicateEvaluator predicateEvaluator = PredicateEvaluatorProvider - .getPredicateEvaluator(predicate, dataSource.getDictionary(), - dataSource.getDataSourceMetadata().getDataType()); - // If predicate is always evaluated false, the result for the filter operator will be empty, early terminate - if (predicateEvaluator.isAlwaysFalse()) { - _resultEmpty = true; - return; - } else if (!predicateEvaluator.isAlwaysTrue()) { - predicateEvaluators.add(predicateEvaluator); - } - } - if (!predicateEvaluators.isEmpty()) { - _predicateEvaluatorsMap.put(columnName, predicateEvaluators); - } - } - - // Remove columns with predicates from group-by columns because we won't use star node for that column - _groupByColumns.removeAll(_predicateEvaluatorsMap.keySet()); - } else { - _predicateEvaluatorsMap = Collections.emptyMap(); - _matchingDictIdsMap = Collections.emptyMap(); - } - } - - /** - * Helper method to process the filter tree and get a map from column to a list of predicates applied to it. - */ - private Map<String, List<Predicate>> getPredicatesMap(FilterContext filter) { - Map<String, List<Predicate>> predicatesMap = new HashMap<>(); - Queue<FilterContext> queue = new LinkedList<>(); - queue.add(filter); - - while (!queue.isEmpty()) { - FilterContext filterNode = queue.remove(); - if (filterNode.getType() == FilterContext.Type.AND) { - queue.addAll(filterNode.getChildren()); - } else { - Predicate predicate = filterNode.getPredicate(); - String columnName = predicate.getLhs().getIdentifier(); - predicatesMap.computeIfAbsent(columnName, k -> new ArrayList<>()).add(predicate); - } - } - - return predicatesMap; + // Remove columns with predicates from group-by columns because we won't use star node for that column + _groupByColumns.removeAll(_predicateEvaluatorsMap.keySet()); } @Override @@ -252,13 +190,14 @@ public class StarTreeFilterOperator extends BaseFilterOperator { /** * Helper method to traverse the star tree, get matching documents and keep track of all the predicate columns that - * are not matched. - * <p>Return <code>null</code> if no matching dictionary id found for a column (i.e. the result for the filter - * operator is empty). + * are not matched. Returns {@code null} if no matching dictionary id found for a column (i.e. the result for the + * filter operator is empty). */ + @Nullable private StarTreeResult traverseStarTree() { - MutableRoaringBitmap matchedDocIds = new MutableRoaringBitmap(); + MutableRoaringBitmap matchingDocIds = new MutableRoaringBitmap(); Set<String> remainingPredicateColumns = new HashSet<>(); + Map<String, IntSet> matchingDictIdsMap = new HashMap<>(); StarTree starTree = _starTreeV2.getStarTree(); List<String> dimensionNames = starTree.getDimensionNames(); @@ -267,19 +206,19 @@ public class StarTreeFilterOperator extends BaseFilterOperator { // Use BFS to traverse the star tree Queue<SearchEntry> queue = new LinkedList<>(); queue.add(new SearchEntry(starTreeRootNode, _predicateEvaluatorsMap.keySet(), _groupByColumns)); - while (!queue.isEmpty()) { - SearchEntry searchEntry = queue.remove(); + SearchEntry searchEntry; + while ((searchEntry = queue.poll()) != null) { StarTreeNode starTreeNode = searchEntry._starTreeNode; // If all predicate columns and group-by columns are matched, we can use aggregated document if (searchEntry._remainingPredicateColumns.isEmpty() && searchEntry._remainingGroupByColumns.isEmpty()) { - matchedDocIds.add(starTreeNode.getAggregatedDocId()); + matchingDocIds.add(starTreeNode.getAggregatedDocId()); } else { // For leaf node, because we haven't exhausted all predicate columns and group-by columns, we cannot use // the aggregated document. Add the range of documents for this node to the bitmap, and keep track of the // remaining predicate columns for this node if (starTreeNode.isLeaf()) { - matchedDocIds.add(starTreeNode.getStartDocId(), starTreeNode.getEndDocId()); + matchingDocIds.add((long) starTreeNode.getStartDocId(), starTreeNode.getEndDocId()); remainingPredicateColumns.addAll(searchEntry._remainingPredicateColumns); } else { // For non-leaf node, proceed to next level @@ -290,7 +229,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator { Set<String> newRemainingPredicateColumns = new HashSet<>(searchEntry._remainingPredicateColumns); newRemainingPredicateColumns.remove(nextDimension); - IntSet matchingDictIds = _matchingDictIdsMap.get(nextDimension); + IntSet matchingDictIds = matchingDictIdsMap.get(nextDimension); if (matchingDictIds == null) { matchingDictIds = getMatchingDictIds(_predicateEvaluatorsMap.get(nextDimension)); @@ -299,7 +238,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator { return null; } - _matchingDictIdsMap.put(nextDimension, matchingDictIds); + matchingDictIdsMap.put(nextDimension, matchingDictIds); } int numMatchingDictIds = matchingDictIds.size(); @@ -358,7 +297,7 @@ public class StarTreeFilterOperator extends BaseFilterOperator { } } - return new StarTreeResult(matchedDocIds, remainingPredicateColumns); + return new StarTreeResult(matchingDocIds, remainingPredicateColumns); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java index c418d21..4fe485d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeDocIdSetPlanNode.java @@ -18,22 +18,24 @@ */ package org.apache.pinot.core.startree.plan; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.core.operator.DocIdSetOperator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.core.plan.PlanNode; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.v2.StarTreeV2; public class StarTreeDocIdSetPlanNode implements PlanNode { private final StarTreeFilterPlanNode _starTreeFilterPlanNode; - public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter, + public StarTreeDocIdSetPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { - _starTreeFilterPlanNode = new StarTreeFilterPlanNode(starTreeV2, filter, groupByColumns, debugOptions); + _starTreeFilterPlanNode = + new StarTreeFilterPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java index 9628086..82b3391 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeFilterPlanNode.java @@ -18,31 +18,32 @@ */ package org.apache.pinot.core.startree.plan; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.PlanNode; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.operator.StarTreeFilterOperator; import org.apache.pinot.core.startree.v2.StarTreeV2; public class StarTreeFilterPlanNode implements PlanNode { private final StarTreeV2 _starTreeV2; - private final FilterContext _filter; + private final Map<String, List<PredicateEvaluator>> _predicateEvaluatorsMap; private final Set<String> _groupByColumns; private final Map<String, String> _debugOptions; - public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, @Nullable FilterContext filter, - @Nullable Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { + public StarTreeFilterPlanNode(StarTreeV2 starTreeV2, Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, + Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { _starTreeV2 = starTreeV2; - _filter = filter; + _predicateEvaluatorsMap = predicateEvaluatorsMap; _groupByColumns = groupByColumns; _debugOptions = debugOptions; } @Override public StarTreeFilterOperator run() { - return new StarTreeFilterOperator(_starTreeV2, _filter, _groupByColumns, _debugOptions); + return new StarTreeFilterOperator(_starTreeV2, _predicateEvaluatorsMap, _groupByColumns, _debugOptions); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java index e6d482d..7857e57 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeProjectionPlanNode.java @@ -19,13 +19,14 @@ package org.apache.pinot.core.startree.plan; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.pinot.core.common.DataSource; import org.apache.pinot.core.operator.ProjectionOperator; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.PlanNode; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.v2.StarTreeV2; @@ -34,13 +35,14 @@ public class StarTreeProjectionPlanNode implements PlanNode { private final StarTreeDocIdSetPlanNode _starTreeDocIdSetPlanNode; public StarTreeProjectionPlanNode(StarTreeV2 starTreeV2, Set<String> projectionColumns, - @Nullable FilterContext filter, @Nullable Set<String> groupByColumns, + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, Set<String> groupByColumns, @Nullable Map<String, String> debugOptions) { - _dataSourceMap = new HashMap<>(projectionColumns.size()); + _dataSourceMap = new HashMap<>(); for (String projectionColumn : projectionColumns) { _dataSourceMap.put(projectionColumn, starTreeV2.getDataSource(projectionColumn)); } - _starTreeDocIdSetPlanNode = new StarTreeDocIdSetPlanNode(starTreeV2, filter, groupByColumns, debugOptions); + _starTreeDocIdSetPlanNode = + new StarTreeDocIdSetPlanNode(starTreeV2, predicateEvaluatorsMap, groupByColumns, debugOptions); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java index 8f50fa5..6a316fd 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/plan/StarTreeTransformPlanNode.java @@ -25,10 +25,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair; import org.apache.pinot.core.startree.v2.StarTreeV2; @@ -39,7 +39,7 @@ public class StarTreeTransformPlanNode implements PlanNode { public StarTreeTransformPlanNode(StarTreeV2 starTreeV2, AggregationFunctionColumnPair[] aggregationFunctionColumnPairs, @Nullable ExpressionContext[] groupByExpressions, - @Nullable FilterContext filter, @Nullable Map<String, String> debugOptions) { + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap, @Nullable Map<String, String> debugOptions) { Set<String> projectionColumns = new HashSet<>(); for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { projectionColumns.add(aggregationFunctionColumnPair.toColumnName()); @@ -54,10 +54,11 @@ public class StarTreeTransformPlanNode implements PlanNode { projectionColumns.addAll(groupByColumns); } else { _groupByExpressions = Collections.emptyList(); - groupByColumns = null; + groupByColumns = Collections.emptySet(); } _starTreeProjectionPlanNode = - new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, filter, groupByColumns, debugOptions); + new StarTreeProjectionPlanNode(starTreeV2, projectionColumns, predicateEvaluatorsMap, groupByColumns, + debugOptions); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java index 7b46277..4ea61cf 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/startree/v2/BaseStarTreeV2Test.java @@ -40,10 +40,10 @@ import org.apache.pinot.core.data.readers.GenericRowRecordReader; import org.apache.pinot.core.indexsegment.IndexSegment; import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.core.plan.FilterPlanNode; import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; -import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils; import org.apache.pinot.core.query.request.context.ExpressionContext; import org.apache.pinot.core.query.request.context.FilterContext; import org.apache.pinot.core.query.request.context.QueryContext; @@ -52,6 +52,7 @@ import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.core.segment.index.readers.ForwardIndexReader; import org.apache.pinot.core.segment.index.readers.ForwardIndexReaderContext; +import org.apache.pinot.core.startree.StarTreeUtils; import org.apache.pinot.core.startree.plan.StarTreeFilterPlanNode; import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder; import org.apache.pinot.core.startree.v2.builder.MultipleTreesBuilder.BuildMode; @@ -185,15 +186,11 @@ abstract class BaseStarTreeV2Test<R, A> { // Aggregations AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions(); - assert aggregationFunctions != null; + assertNotNull(aggregationFunctions); int numAggregations = aggregationFunctions.length; - List<AggregationFunctionColumnPair> functionColumnPairs = new ArrayList<>(numAggregations); - for (AggregationFunction aggregationFunction : aggregationFunctions) { - AggregationFunctionColumnPair aggregationFunctionColumnPair = - AggregationFunctionUtils.getAggregationFunctionColumnPair(aggregationFunction); - assertNotNull(aggregationFunctionColumnPair); - functionColumnPairs.add(aggregationFunctionColumnPair); - } + AggregationFunctionColumnPair[] aggregationFunctionColumnPairs = + StarTreeUtils.extractAggregationFunctionPairs(aggregationFunctions); + assertNotNull(aggregationFunctionColumnPairs); // Group-by columns Set<String> groupByColumnSet = new HashSet<>(); @@ -208,16 +205,15 @@ abstract class BaseStarTreeV2Test<R, A> { // Filter FilterContext filter = queryContext.getFilter(); + Map<String, List<PredicateEvaluator>> predicateEvaluatorsMap = + StarTreeUtils.extractPredicateEvaluatorsMap(_indexSegment, filter); + assertNotNull(predicateEvaluatorsMap); // Extract values with star-tree - PlanNode starTreeFilterPlanNode; - if (groupByColumns.isEmpty()) { - starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, null, null); - } else { - starTreeFilterPlanNode = new StarTreeFilterPlanNode(_starTreeV2, filter, groupByColumnSet, null); - } + PlanNode starTreeFilterPlanNode = + new StarTreeFilterPlanNode(_starTreeV2, predicateEvaluatorsMap, groupByColumnSet, null); List<ForwardIndexReader> starTreeAggregationColumnReaders = new ArrayList<>(numAggregations); - for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) { + for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { starTreeAggregationColumnReaders .add(_starTreeV2.getDataSource(aggregationFunctionColumnPair.toColumnName()).getForwardIndex()); } @@ -232,7 +228,7 @@ abstract class BaseStarTreeV2Test<R, A> { PlanNode nonStarTreeFilterPlanNode = new FilterPlanNode(_indexSegment, queryContext); List<ForwardIndexReader> nonStarTreeAggregationColumnReaders = new ArrayList<>(numAggregations); List<Dictionary> nonStarTreeAggregationColumnDictionaries = new ArrayList<>(numAggregations); - for (AggregationFunctionColumnPair aggregationFunctionColumnPair : functionColumnPairs) { + for (AggregationFunctionColumnPair aggregationFunctionColumnPair : aggregationFunctionColumnPairs) { if (aggregationFunctionColumnPair.getFunctionType() == AggregationFunctionType.COUNT) { nonStarTreeAggregationColumnReaders.add(null); nonStarTreeAggregationColumnDictionaries.add(null); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index f2bf7dc..0d3c3b0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -477,6 +477,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs); assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Should be able to use the star-tree with an additional match-all predicate on another dimension + firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1 + " AND DaysSinceEpoch > 16070"); + assertEquals(firstQueryResponse.get("aggregationResults").get(0).get("value").asInt(), firstQueryResult); + assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs); + assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Test the second query JsonNode secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2); int secondQueryResult = secondQueryResponse.get("aggregationResults").get(0).get("value").asInt(); @@ -518,6 +524,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs); assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Should be able to use the star-tree with an additional match-all predicate on another dimension + secondQueryResponse = postQuery(TEST_STAR_TREE_QUERY_2 + " AND DaysSinceEpoch > 16070"); + assertEquals(secondQueryResponse.get("aggregationResults").get(0).get("value").asInt(), secondQueryResult); + assertEquals(secondQueryResponse.get("totalDocs").asLong(), numTotalDocs); + assertEquals(secondQueryResponse.get("numDocsScanned").asInt(), NUM_SEGMENTS); + // Remove the star-tree index config and trigger reload indexingConfig.setStarTreeIndexConfigs(null); updateTableConfig(tableConfig); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org