This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 2a5d5b7 Introduce in-Segment Trim for GroupBy OrderBy Query (#6991) 2a5d5b7 is described below commit 2a5d5b7517a46ec76d73742b31c6d9e457920f21 Author: wuwenw <55009204+wuw...@users.noreply.github.com> AuthorDate: Thu Jun 10 17:15:06 2021 -0400 Introduce in-Segment Trim for GroupBy OrderBy Query (#6991) One of the major bottlenecks for the current GroupBy OrderBy query on high cardinality columns is the merge phase. Essentially every segment brings a large number of intermediate results to a global concurrent map for further aggregation and merge, which takes up a lot of space and is very time-consuming. This PR introduces an optimization option that each segment trims its intermediate results to a given size. The size is configurable by the user and is guaranteed to be max(limit N * [...] --- .../pinot/core/data/table/IntermediateRecord.java | 42 +++ .../apache/pinot/core/data/table/TableResizer.java | 58 +++-- .../operator/blocks/IntermediateResultsBlock.java | 29 ++- .../combine/GroupByOrderByCombineOperator.java | 37 ++- .../query/AggregationGroupByOrderByOperator.java | 29 ++- .../plan/AggregationGroupByOrderByPlanNode.java | 13 +- .../core/plan/maker/InstancePlanMakerImplV2.java | 40 ++- .../groupby/DefaultGroupByExecutor.java | 13 + .../groupby/DictionaryBasedGroupKeyGenerator.java | 36 +++ .../query/aggregation/groupby/GroupByExecutor.java | 18 ++ .../aggregation/groupby/GroupKeyGenerator.java | 5 + .../NoDictionaryMultiColumnGroupKeyGenerator.java | 5 + .../NoDictionarySingleColumnGroupKeyGenerator.java | 5 + .../org/apache/pinot/core/util/GroupByUtils.java | 15 +- .../pinot/core/data/table/TableResizerTest.java | 85 ++++++ .../groupby/GroupByInSegmentTrimTest.java | 284 +++++++++++++++++++++ .../InterSegmentOrderByMultiValueQueriesTest.java | 14 + .../InterSegmentOrderBySingleValueQueriesTest.java | 16 ++ 18 files changed, 693 insertions(+), 51 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java new file mode 100644 index 0000000..520c3e8 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IntermediateRecord.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.table; + +/** + * Helper class to store a subset of Record fields + * IntermediateRecord is derived from a Record + * Some of the main properties of an IntermediateRecord are: + * + * 1. Key in IntermediateRecord is expected to be identical to the one in the Record + * 2. For values, IntermediateRecord should only have the columns needed for order by + * 3. Inside the values, the columns should be ordered by the order by sequence + * 4. For order by on aggregations, final results are extracted + * 5. There is a mandatory field to store the original record to prevent from duplicate looking up + */ +public class IntermediateRecord { + public final Key _key; + public final Comparable[] _values; + public final Record _record; + + IntermediateRecord(Key key, Comparable[] values, Record record) { + _key = key; + _values = values; + _record = record; + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java index e306350..9f95bc0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; @@ -34,6 +35,8 @@ import org.apache.pinot.common.request.context.OrderByExpressionContext; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; import org.apache.pinot.core.query.postaggregation.PostAggregationFunction; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.spi.utils.ByteArray; @@ -128,7 +131,7 @@ public class TableResizer { for (int i = 0; i < _numOrderByExpressions; i++) { intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record); } - return new IntermediateRecord(key, intermediateRecordValues); + return new IntermediateRecord(key, intermediateRecordValues, record); } /** @@ -166,7 +169,7 @@ public class TableResizer { PriorityQueue<IntermediateRecord> priorityQueue = convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator); for (IntermediateRecord recordToRetain : priorityQueue) { - trimmedRecordsMap.put(recordToRetain._key, recordsMap.get(recordToRetain._key)); + trimmedRecordsMap.put(recordToRetain._key, recordToRetain._record); } return trimmedRecordsMap; } @@ -203,34 +206,51 @@ public class TableResizer { } int numRecordsToRetain = Math.min(numRecords, trimToSize); // make PQ of sorted records to retain - PriorityQueue<IntermediateRecord> priorityQueue = convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed()); + PriorityQueue<IntermediateRecord> priorityQueue = + convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed()); Record[] sortedArray = new Record[numRecordsToRetain]; while (!priorityQueue.isEmpty()) { IntermediateRecord intermediateRecord = priorityQueue.poll(); - Record record = recordsMap.get(intermediateRecord._key); - sortedArray[--numRecordsToRetain] = record; + sortedArray[--numRecordsToRetain] = intermediateRecord._record;; } return Arrays.asList(sortedArray); } /** - * Helper class to store a subset of Record fields - * IntermediateRecord is derived from a Record - * Some of the main properties of an IntermediateRecord are: - * - * 1. Key in IntermediateRecord is expected to be identical to the one in the Record - * 2. For values, IntermediateRecord should only have the columns needed for order by - * 3. Inside the values, the columns should be ordered by the order by sequence - * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable + * Trims the aggregation results using a priority queue and returns the priority queue. + * This method is to be called from individual segment if the intermediate results need to be trimmed. */ - private static class IntermediateRecord { - final Key _key; - final Comparable[] _values; + public PriorityQueue<IntermediateRecord> trimInSegmentResults(Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator, + GroupByResultHolder[] _groupByResultHolders, int size) { + int numAggregationFunctions = _aggregationFunctions.length; + int numColumns = numAggregationFunctions + _numGroupByExpressions; - IntermediateRecord(Key key, Comparable[] values) { - _key = key; - _values = values; + // Get comparator + Comparator<IntermediateRecord> comparator = _intermediateRecordComparator.reversed(); + PriorityQueue<IntermediateRecord> priorityQueue = new PriorityQueue<>(size, comparator); + while (groupKeyIterator.hasNext()) { + // Iterate over keys + GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next(); + Object[] keys = groupKey._keys; + Object[] values = Arrays.copyOf(keys, numColumns); + int groupId = groupKey._groupId; + for (int i = 0; i < numAggregationFunctions; i++) { + values[_numGroupByExpressions + i] = + _aggregationFunctions[i].extractGroupByResult(_groupByResultHolders[i], groupId); + } + // {key, intermediate_record, record} + IntermediateRecord intermediateRecord = getIntermediateRecord(new Key(keys), new Record(values)); + if (priorityQueue.size() < size) { + priorityQueue.offer(intermediateRecord); + } else { + IntermediateRecord peek = priorityQueue.peek(); + if (comparator.compare(peek, intermediateRecord) < 0) { + priorityQueue.poll(); + priorityQueue.offer(intermediateRecord); + } + } } + return priorityQueue; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java index b50a836..ac99fff 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java @@ -38,6 +38,7 @@ import org.apache.pinot.core.common.BlockDocIdValueSet; import org.apache.pinot.core.common.BlockMetadata; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.datatable.DataTableBuilder; +import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.Table; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; @@ -57,6 +58,7 @@ public class IntermediateResultsBlock implements Block { private AggregationGroupByResult _aggregationGroupByResult; private List<Map<String, Object>> _combinedAggregationGroupByResult; private List<ProcessingException> _processingExceptions; + private Collection<IntermediateRecord> _intermediateRecords; private long _numDocsScanned; private long _numEntriesScannedInFilter; private long _numEntriesScannedPostFilter; @@ -117,6 +119,17 @@ public class IntermediateResultsBlock implements Block { _dataSchema = dataSchema; } + /** + * Constructor for aggregation group-by order-by result with {@link AggregationGroupByResult} and + * with a collection of intermediate records. + */ + public IntermediateResultsBlock(AggregationFunction[] aggregationFunctions, + Collection<IntermediateRecord> intermediateRecords, DataSchema dataSchema) { + _aggregationFunctions = aggregationFunctions; + _dataSchema = dataSchema; + _intermediateRecords = intermediateRecords; + } + public IntermediateResultsBlock(Table table) { _table = table; _dataSchema = table.getDataSchema(); @@ -210,14 +223,14 @@ public class IntermediateResultsBlock implements Block { _executionThreadCpuTimeNs = executionThreadCpuTimeNs; } - public void setNumServerThreads(int numServerThreads) { - _numServerThreads = numServerThreads; - } - public int getNumServerThreads() { return _numServerThreads; } + public void setNumServerThreads(int numServerThreads) { + _numServerThreads = numServerThreads; + } + @VisibleForTesting public long getNumDocsScanned() { return _numDocsScanned; @@ -281,6 +294,14 @@ public class IntermediateResultsBlock implements Block { _numGroupsLimitReached = numGroupsLimitReached; } + /** + * Get the collection of intermediate records + */ + @Nullable + public Collection<IntermediateRecord> getIntermediateRecords() { + return _intermediateRecords; + } + public DataTable getDataTable() throws Exception { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 071a682..a230561 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.operator.combine; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; @@ -34,6 +35,7 @@ import org.apache.pinot.common.response.ProcessingException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.data.table.ConcurrentIndexedTable; +import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; @@ -128,19 +130,30 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { } // Merge aggregation group-by result. - AggregationGroupByResult aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult(); - if (aggregationGroupByResult != null) { - // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable - Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator(); - while (groupKeyIterator.hasNext()) { - GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next(); - Object[] keys = groupKey._keys; - Object[] values = Arrays.copyOf(keys, _numColumns); - int groupId = groupKey._groupId; - for (int i = 0; i < _numAggregationFunctions; i++) { - values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId); + // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable + Collection<IntermediateRecord> intermediateRecords = intermediateResultsBlock.getIntermediateRecords(); + // For now, only GroupBy OrderBy query has pre-constructed intermediate records + if (intermediateRecords == null) { + // Merge aggregation group-by result. + AggregationGroupByResult aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult(); + if (aggregationGroupByResult != null) { + // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable + Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator(); + while (dicGroupKeyIterator.hasNext()) { + GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next(); + Object[] keys = groupKey._keys; + Object[] values = Arrays.copyOf(keys, _numColumns); + int groupId = groupKey._groupId; + for (int i = 0; i < _numAggregationFunctions; i++) { + values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId); + } + _indexedTable.upsert(new Key(keys), new Record(values)); } - _indexedTable.upsert(new Key(keys), new Record(values)); + } + } else { + for (IntermediateRecord intermediateResult : intermediateRecords) { + //TODO: change upsert api so that it accepts intermediateRecord directly + _indexedTable.upsert(intermediateResult._key, intermediateResult._record); } } } catch (EarlyTerminationException e) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java index 883d718..ebc6947 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java @@ -18,8 +18,11 @@ */ package org.apache.pinot.core.operator.query; +import java.util.Collection; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.TableResizer; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.ExecutionStatistics; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; @@ -28,8 +31,11 @@ import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor; import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor; +import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor; +import static org.apache.pinot.core.util.GroupByUtils.getTableCapacity; + /** * The <code>AggregationGroupByOrderByOperator</code> class provides the operator for aggregation group-by query on a @@ -43,16 +49,19 @@ public class AggregationGroupByOrderByOperator extends BaseOperator<Intermediate private final ExpressionContext[] _groupByExpressions; private final int _maxInitialResultHolderCapacity; private final int _numGroupsLimit; + private final int _minSegmentTrimSize; private final TransformOperator _transformOperator; private final long _numTotalDocs; private final boolean _useStarTree; private final DataSchema _dataSchema; + private final QueryContext _queryContext; private int _numDocsScanned = 0; public AggregationGroupByOrderByOperator(AggregationFunction[] aggregationFunctions, ExpressionContext[] groupByExpressions, int maxInitialResultHolderCapacity, int numGroupsLimit, - TransformOperator transformOperator, long numTotalDocs, boolean useStarTree) { + int minSegmentTrimSize, TransformOperator transformOperator, long numTotalDocs, QueryContext queryContext, + boolean useStarTree) { _aggregationFunctions = aggregationFunctions; _groupByExpressions = groupByExpressions; _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity; @@ -60,6 +69,8 @@ public class AggregationGroupByOrderByOperator extends BaseOperator<Intermediate _transformOperator = transformOperator; _numTotalDocs = numTotalDocs; _useStarTree = useStarTree; + _queryContext = queryContext; + _minSegmentTrimSize = minSegmentTrimSize; // NOTE: The indexedTable expects that the the data schema will have group by columns before aggregation columns int numGroupByExpressions = groupByExpressions.length; @@ -106,8 +117,20 @@ public class AggregationGroupByOrderByOperator extends BaseOperator<Intermediate groupByExecutor.process(transformBlock); } - // Build intermediate result block based on aggregation group-by result from the executor - return new IntermediateResultsBlock(_aggregationFunctions, groupByExecutor.getResult(), _dataSchema); + // There is no OrderBy or minSegmentTrimSize is set to be negative or 0 + if (_queryContext.getOrderByExpressions() == null || _minSegmentTrimSize <= 0) { + // Build intermediate result block based on aggregation group-by result from the executor + return new IntermediateResultsBlock(_aggregationFunctions, groupByExecutor.getResult(), _dataSchema); + } + int trimSize = getTableCapacity(_queryContext.getLimit(), _minSegmentTrimSize); + // Num of groups hasn't reached the threshold + if (groupByExecutor.getNumGroups() <= trimSize) { + return new IntermediateResultsBlock(_aggregationFunctions, groupByExecutor.getResult(), _dataSchema); + } + // Trim + TableResizer tableResizer = new TableResizer(_dataSchema, _queryContext); + Collection<IntermediateRecord> intermediateRecords = groupByExecutor.trimGroupByResult(trimSize, tableResizer); + return new IntermediateResultsBlock(_aggregationFunctions, intermediateRecords, _dataSchema); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java index a392948..915eb5e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java @@ -33,7 +33,6 @@ import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair; import org.apache.pinot.segment.spi.index.startree.StarTreeV2; - /** * The <code>AggregationGroupByOrderByPlanNode</code> class provides the execution plan for aggregation group-by order-by query on a * single segment. @@ -43,13 +42,15 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode { private final IndexSegment _indexSegment; private final int _maxInitialResultHolderCapacity; private final int _numGroupsLimit; + private final int _minSegmentTrimSize; private final AggregationFunction[] _aggregationFunctions; private final ExpressionContext[] _groupByExpressions; private final TransformPlanNode _transformPlanNode; private final StarTreeTransformPlanNode _starTreeTransformPlanNode; + private final QueryContext _queryContext; public AggregationGroupByOrderByPlanNode(IndexSegment indexSegment, QueryContext queryContext, - int maxInitialResultHolderCapacity, int numGroupsLimit) { + int maxInitialResultHolderCapacity, int numGroupsLimit, int minSegmentTrimSize) { _indexSegment = indexSegment; _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity; _numGroupsLimit = numGroupsLimit; @@ -58,6 +59,8 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode { List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions(); assert groupByExpressions != null; _groupByExpressions = groupByExpressions.toArray(new ExpressionContext[0]); + _queryContext = queryContext; + _minSegmentTrimSize = minSegmentTrimSize; List<StarTreeV2> starTrees = indexSegment.getStarTrees(); if (starTrees != null && !StarTreeUtils.isStarTreeDisabled(queryContext)) { @@ -95,11 +98,13 @@ public class AggregationGroupByOrderByPlanNode implements PlanNode { if (_transformPlanNode != null) { // Do not use star-tree return new AggregationGroupByOrderByOperator(_aggregationFunctions, _groupByExpressions, - _maxInitialResultHolderCapacity, _numGroupsLimit, _transformPlanNode.run(), numTotalDocs, false); + _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentTrimSize, _transformPlanNode.run(), numTotalDocs, + _queryContext, false); } else { // Use star-tree return new AggregationGroupByOrderByOperator(_aggregationFunctions, _groupByExpressions, - _maxInitialResultHolderCapacity, _numGroupsLimit, _starTreeTransformPlanNode.run(), numTotalDocs, true); + _maxInitialResultHolderCapacity, _numGroupsLimit, _minSegmentTrimSize, _starTreeTransformPlanNode.run(), + numTotalDocs, _queryContext, true); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index 4c801e3..3f94d80 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -44,6 +44,7 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils import org.apache.pinot.core.query.config.QueryExecutorConfig; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.core.query.request.context.utils.QueryContextUtils; +import org.apache.pinot.core.util.GroupByUtils; import org.apache.pinot.core.util.QueryOptions; import org.apache.pinot.segment.spi.AggregationFunctionType; import org.apache.pinot.segment.spi.IndexSegment; @@ -56,28 +57,32 @@ import org.slf4j.LoggerFactory; * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}. */ public class InstancePlanMakerImplV2 implements PlanMaker { - private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); - public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY_KEY = "max.init.group.holder.capacity"; public static final int DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; public static final String NUM_GROUPS_LIMIT = "num.groups.limit"; public static final int DEFAULT_NUM_GROUPS_LIMIT = 100_000; - + public static final String ENABLE_SEGMENT_GROUP_TRIM = "enable.segment.group.trim"; + public static final boolean DEFAULT_ENABLE_SEGMENT_GROUP_TRIM = false; + public static final String MIN_SEGMENT_GROUP_TRIM_SIZE = "min.segment.group.trim.size"; + public static final int DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE = -1; // set as pinot.server.query.executor.groupby.trim.threshold public static final String GROUPBY_TRIM_THRESHOLD = "groupby.trim.threshold"; public static final int DEFAULT_GROUPBY_TRIM_THRESHOLD = 1_000_000; + private static final Logger LOGGER = LoggerFactory.getLogger(InstancePlanMakerImplV2.class); private final int _maxInitialResultHolderCapacity; // Limit on number of groups stored for each segment, beyond which no new group will be created private final int _numGroupsLimit; // Used for SQL GROUP BY (server combine) private final int _groupByTrimThreshold; + private final int _minSegmentGroupTrimSize; @VisibleForTesting public InstancePlanMakerImplV2() { _maxInitialResultHolderCapacity = DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY; _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT; _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD; + _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE; } @VisibleForTesting @@ -85,6 +90,15 @@ public class InstancePlanMakerImplV2 implements PlanMaker { _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity; _numGroupsLimit = numGroupsLimit; _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD; + _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE; + } + + @VisibleForTesting + public InstancePlanMakerImplV2(int minSegmentGroupTrimSize) { + _maxInitialResultHolderCapacity = DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY; + _numGroupsLimit = DEFAULT_NUM_GROUPS_LIMIT; + _groupByTrimThreshold = DEFAULT_GROUPBY_TRIM_THRESHOLD; + _minSegmentGroupTrimSize = minSegmentGroupTrimSize; } /** @@ -105,8 +119,22 @@ public class InstancePlanMakerImplV2 implements PlanMaker { Preconditions.checkState(_maxInitialResultHolderCapacity <= _numGroupsLimit, "Invalid configuration: maxInitialResultHolderCapacity: %d must be smaller or equal to numGroupsLimit: %d", _maxInitialResultHolderCapacity, _numGroupsLimit); - LOGGER.info("Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}", - _maxInitialResultHolderCapacity, _numGroupsLimit); + boolean enableSegmentGroupTrim = + queryExecutorConfig.getConfig().getProperty(ENABLE_SEGMENT_GROUP_TRIM, DEFAULT_ENABLE_SEGMENT_GROUP_TRIM); + int minSegmentGroupTrimSize = + queryExecutorConfig.getConfig().getProperty(MIN_SEGMENT_GROUP_TRIM_SIZE, DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE); + + if (minSegmentGroupTrimSize > 0) { + _minSegmentGroupTrimSize = minSegmentGroupTrimSize; + } else if (enableSegmentGroupTrim) { + _minSegmentGroupTrimSize = GroupByUtils.DEFAULT_MIN_NUM_GROUPS; + } else { + _minSegmentGroupTrimSize = DEFAULT_MIN_SEGMENT_GROUP_TRIM_SIZE; + } + LOGGER.info( + "Initializing plan maker with maxInitialResultHolderCapacity: {}, numGroupsLimit: {}, enableSegmentTrim: {}, minSegmentGroupTrimSize: {}", + _maxInitialResultHolderCapacity, _numGroupsLimit, minSegmentGroupTrimSize > 0 || enableSegmentGroupTrim, + _minSegmentGroupTrimSize); } @Override @@ -138,7 +166,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { // new Combine operator only when GROUP_BY_MODE explicitly set to SQL if (queryOptions.isGroupByModeSQL()) { return new AggregationGroupByOrderByPlanNode(indexSegment, queryContext, _maxInitialResultHolderCapacity, - _numGroupsLimit); + _numGroupsLimit, _minSegmentGroupTrimSize); } return new AggregationGroupByPlanNode(indexSegment, queryContext, _maxInitialResultHolderCapacity, _numGroupsLimit); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java index d67b4f2..de9cfa9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DefaultGroupByExecutor.java @@ -18,9 +18,12 @@ */ package org.apache.pinot.core.query.aggregation.groupby; +import java.util.Collection; import java.util.Map; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.core.common.BlockValSet; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.TableResizer; import org.apache.pinot.core.operator.blocks.TransformBlock; import org.apache.pinot.core.operator.transform.TransformOperator; import org.apache.pinot.core.operator.transform.TransformResultMetadata; @@ -144,4 +147,14 @@ public class DefaultGroupByExecutor implements GroupByExecutor { public AggregationGroupByResult getResult() { return new AggregationGroupByResult(_groupKeyGenerator, _aggregationFunctions, _groupByResultHolders); } + + @Override + public int getNumGroups() { + return _groupKeyGenerator.getNumKeys(); + } + + @Override + public Collection<IntermediateRecord> trimGroupByResult(int trimSize, TableResizer tableResizer) { + return tableResizer.trimInSegmentResults(_groupKeyGenerator.getGroupKeys(), _groupByResultHolders, trimSize); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java index 79b7afe..3c63a0d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java @@ -206,6 +206,9 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { return _rawKeyHolder.getStringGroupKeys(); } + @Override + public int getNumKeys() { return _rawKeyHolder.getNumKeys(); } + private interface RawKeyHolder { /** @@ -240,11 +243,18 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { * Returns an iterator of {@link StringGroupKey}. Use this interface to iterate through all the group keys. */ Iterator<StringGroupKey> getStringGroupKeys(); + + /** + * Returns current number of unique keys + */ + int getNumKeys(); + } private class ArrayBasedHolder implements RawKeyHolder { // TODO: using bitmap might better private final boolean[] _flags = new boolean[_globalGroupIdUpperBound]; + private int _numKeys = 0; @Override public void processSingleValue(int numDocs, int[] outGroupIds) { @@ -254,6 +264,10 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { groupId = groupId * _cardinalities[j] + _singleValueDictIds[j][i]; } outGroupIds[i] = groupId; + // if the flag is false, then increase the key num + if (!_flags[groupId]) { + _numKeys++; + } _flags[groupId] = true; } } @@ -263,6 +277,9 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { for (int i = 0; i < numDocs; i++) { int[] groupIds = getIntRawKeys(i); for (int groupId : groupIds) { + if (!_flags[groupId]) { + _numKeys++; + } _flags[groupId] = true; } outGroupIds[i] = groupIds; @@ -331,6 +348,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { } }; } + + @Override + public int getNumKeys() { + return _numKeys; + } } private class IntMapBasedHolder implements RawKeyHolder { @@ -419,6 +441,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { } }; } + + @Override + public int getNumKeys() { + return _groupIdMap.size(); + } } /** @@ -634,6 +661,10 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { } }; } + @Override + public int getNumKeys() { + return _groupIdMap.size(); + } } /** @@ -836,6 +867,11 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator { } }; } + + @Override + public int getNumKeys() { + return _groupIdMap.size(); + } } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java index c1266c5..869ef5d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupByExecutor.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.core.query.aggregation.groupby; +import java.util.Collection; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.data.table.TableResizer; import org.apache.pinot.core.operator.blocks.TransformBlock; @@ -40,4 +43,19 @@ public interface GroupByExecutor { * @return Result of aggregation */ AggregationGroupByResult getResult(); + + /** + * Returns the number of generated results + * + * @return Number of results + */ + int getNumGroups(); + + /** + * Trim the GroupBy result up to the threshold max(configurable_threshold * 5, minTrimSize) + * TODO: benchmark the performance of PQ vs. topK + * <p>Should be called after all transform blocks has been processed. + * + */ + Collection<IntermediateRecord> trimGroupByResult(int trimSize, TableResizer tableResizer); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java index c9ceaea..592ad19 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java @@ -78,6 +78,11 @@ public interface GroupKeyGenerator { Iterator<StringGroupKey> getStringGroupKeys(); /** + * Return current number of unique keys + */ + int getNumKeys(); + + /** * This class encapsulates the integer group id and the group keys. */ class GroupKey { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java index d8c7c8e..9f9422a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java @@ -337,6 +337,11 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat } } + @Override + public int getNumKeys() { + return _groupKeyMap.size(); + } + /** * Iterator for {@link GroupKey}. */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java index 12b827b..cfee809 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionarySingleColumnGroupKeyGenerator.java @@ -200,6 +200,11 @@ public class NoDictionarySingleColumnGroupKeyGenerator implements GroupKeyGenera } } + @Override + public int getNumKeys() { + return _groupKeyMap.size(); + } + private int getKeyForValue(int value) { Int2IntMap map = (Int2IntMap) _groupKeyMap; int groupId = map.get(value); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java index dd29a29..bf0aa03 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java @@ -25,7 +25,7 @@ public final class GroupByUtils { private GroupByUtils() { } - private static final int NUM_RESULTS_LOWER_LIMIT = 5000; + public static final int DEFAULT_MIN_NUM_GROUPS = 5000; /** * (For PQL semantic) Returns the capacity of the table required by the given query. @@ -33,7 +33,16 @@ public final class GroupByUtils { * in PQL semantic. */ public static int getTableCapacity(int limit) { - return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT); + return Math.max(limit * 5, DEFAULT_MIN_NUM_GROUPS); + } + + /** + * Returns the capacity of the table required by the given query. + * NOTE: It returns {@code max(limit * 5, minNumGroups)} where minNumGroups is configured by the user + * (Default: 5000) + */ + public static int getTableCapacity(int limit, int minNumGroups) { + return Math.max(limit * 5, minNumGroups); } /** @@ -46,7 +55,7 @@ public final class GroupByUtils { public static int getTableCapacity(QueryContext queryContext) { int limit = queryContext.getLimit(); if (queryContext.getOrderByExpressions() != null || queryContext.getHavingFilter() != null) { - return Math.max(limit * 5, NUM_RESULTS_LOWER_LIMIT); + return Math.max(limit * 5, DEFAULT_MIN_NUM_GROUPS); } else { return limit; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java index 0f8eb1a..bf85846 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java @@ -21,9 +21,15 @@ package org.apache.pinot.core.data.table; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.query.aggregation.groupby.DoubleGroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; +import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator; +import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; import org.apache.pinot.segment.local.customobject.AvgPair; import org.testng.annotations.BeforeClass; @@ -43,10 +49,13 @@ public class TableResizerTest { new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", "distinctcount(m3)", "avg(m4)"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT}); private static final int TRIM_TO_SIZE = 3; + private static final int NUM_RESULT_HOLDER = 4; private Map<Key, Record> _recordsMap; private List<Record> _records; private List<Key> _keys; + private List<GroupKeyGenerator.GroupKey> _groupKeys; + private GroupByResultHolder[] _groupByResultHolders; @BeforeClass public void setUp() { @@ -66,6 +75,35 @@ public class TableResizerTest { new Key(new Object[]{"c", 300, 5.0}) ); //@formatter:on + List<Object[]> objectArray = Arrays.asList( + new Object[]{"a", 10, 1.0}, + new Object[]{"b", 10, 2.0}, + new Object[]{"c", 200, 3.0}, + new Object[]{"c", 50, 4.0}, + new Object[]{"c", 300, 5.0}); + + // Use _keys for _groupKeys + _groupKeys = new LinkedList<>(); + for (int i = 0; i < _keys.size(); ++i) { + GroupKeyGenerator.GroupKey groupKey = new GroupKeyGenerator.GroupKey(); + groupKey._keys = objectArray.get(i); + groupKey._groupId = i; + _groupKeys.add(groupKey); + } + + // groupByResults are the same as _records + _groupByResultHolders = new GroupByResultHolder[NUM_RESULT_HOLDER]; + _groupByResultHolders[0] = new DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0); + _groupByResultHolders[1] = new DoubleGroupByResultHolder(_groupKeys.size(), _groupKeys.size(), 0.0); + _groupByResultHolders[2] = new ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size()); + _groupByResultHolders[3] = new ObjectGroupByResultHolder(_groupKeys.size(), _groupKeys.size()); + for (int i = 0; i < _groupKeys.size(); ++i) { + _groupByResultHolders[0].setValueForKey(_groupKeys.get(i)._groupId, (double)_records.get(i).getValues()[3]); + _groupByResultHolders[1].setValueForKey(_groupKeys.get(i)._groupId, (double)_records.get(i).getValues()[4]); + _groupByResultHolders[2].setValueForKey(_groupKeys.get(i)._groupId, _records.get(i).getValues()[5]); + _groupByResultHolders[3].setValueForKey(_groupKeys.get(i)._groupId, _records.get(i).getValues()[6]); + } + _recordsMap = new HashMap<>(); int numRecords = _records.size(); for (int i = 0; i < numRecords; i++) { @@ -289,4 +327,51 @@ public class TableResizerTest { assertEquals(sortedRecords.get(1), _records.get(0)); assertEquals(sortedRecords.get(2), _records.get(3)); } + + /** + * Tests in-segment trim from 15 records to 10 records + */ + @Test + public void testInSegmentTrim() { + TableResizer tableResizer = + new TableResizer(DATA_SCHEMA, QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "d3 DESC")); + PriorityQueue<IntermediateRecord> results = + tableResizer.trimInSegmentResults(_groupKeys.listIterator(), _groupByResultHolders, TRIM_TO_SIZE); + assertEquals(results.size(), TRIM_TO_SIZE); + IntermediateRecord[] resultArray = new IntermediateRecord[results.size()]; + for (int i = 0; i < TRIM_TO_SIZE; ++i) { + IntermediateRecord result = results.poll(); + resultArray[i] = result; + } + // _records[4], _records[3], _records[2] + assertEquals(resultArray[0]._record, _records.get(2)); + assertEquals(resultArray[1]._record, _records.get(3)); + assertEquals(resultArray[2]._record, _records.get(4)); + + tableResizer = new TableResizer(DATA_SCHEMA, QueryContextConverterUtils + .getQueryContextFromSQL(QUERY_PREFIX + "SUM(m1) DESC, max(m2) DESC, DISTINCTCOUNT(m3) DESC")); + results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(), _groupByResultHolders, TRIM_TO_SIZE); + assertEquals(results.size(), TRIM_TO_SIZE); + for (int i = 0; i < TRIM_TO_SIZE; ++i) { + IntermediateRecord result = results.poll(); + resultArray[i] = result; + } + // _records[2], _records[3], _records[1] + assertEquals(resultArray[0]._record, _records.get(1)); + assertEquals(resultArray[1]._record, _records.get(3)); + assertEquals(resultArray[2]._record, _records.get(2)); + + tableResizer = new TableResizer(DATA_SCHEMA, + QueryContextConverterUtils.getQueryContextFromSQL(QUERY_PREFIX + "DISTINCTCOUNT(m3) DESC, AVG(m4) ASC")); + results = tableResizer.trimInSegmentResults(_groupKeys.listIterator(), _groupByResultHolders, TRIM_TO_SIZE); + assertEquals(results.size(), TRIM_TO_SIZE); + for (int i = 0; i < TRIM_TO_SIZE; ++i) { + IntermediateRecord result = results.poll(); + resultArray[i] = result; + } + // _records[2], _records[3], _records[1] + assertEquals(resultArray[0]._record, _records.get(1)); + assertEquals(resultArray[1]._record, _records.get(3)); + assertEquals(resultArray[2]._record, _records.get(4)); + } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java new file mode 100644 index 0000000..c50ef8e --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/aggregation/groupby/GroupByInSegmentTrimTest.java @@ -0,0 +1,284 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.query.aggregation.groupby; + +import java.io.File; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.data.table.IntermediateRecord; +import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; +import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator; +import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode; +import org.apache.pinot.core.query.request.context.QueryContext; +import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils; +import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; +import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; +import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; +import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.MetricFieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.data.readers.GenericRow; +import org.apache.pinot.spi.utils.Pair; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import static java.lang.Math.max; +import static org.testng.Assert.assertEquals; + + +/** + * Unit test for GroupBy Trim functionalities. + * - Builds a segment with random data. + * - Uses AggregationGroupByOrderByPlanNode class to construct an AggregationGroupByOrderByOperator + * - Perform aggregationGroupBy and OrderBy on the data + * - Also computes those results itself. + * - Asserts that the aggregation results returned by the class are the same as + * returned by the local computations. + * + * Currently tests 'max' functions, and can be easily extended to + * test other conditions such as GroupBy without OrderBy + */ +public class GroupByInSegmentTrimTest { + private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "GroupByInSegmentTrimTest"); + private static final String SEGMENT_NAME = "TestGroupByInSegment"; + + private static final String METRIC_PREFIX = "metric_"; + private static final int NUM_ROWS = 1000; + private static final int NUM_COLUMN = 2; + private static final int MAX_INITIAL_RESULT_HOLDER_CAPACITY = 10_000; + private static final int NUM_GROUPS_LIMIT = 100_000; + private static IndexSegment _indexSegment; + private static String[] _columns; + private static double[][] _inputData; + private static Map<Double, Double> _resultMap; + + /** + * Initializations prior to the test: + * - Build a segment with metric columns (that will be aggregated and grouped) containing + * randomly generated data. + * + * @throws Exception + */ + @BeforeClass + public void setUp() + throws Exception { + _resultMap = new HashMap<>(); + // Current Schema: Columns: metrics_0(double), metrics_1(double) + _inputData = new double[NUM_COLUMN][NUM_ROWS]; + _columns = new String[NUM_COLUMN]; + setupSegment(); + } + + /** + * Test the GroupBy OrderBy query and compute the expected results to match + */ + @Test(dataProvider = "QueryDataProvider") + void TestGroupByOrderByOperator(int trimSize, List<Pair<Double, Double>> expectedResult, QueryContext queryContext) { + // Create a query plan + AggregationGroupByOrderByPlanNode aggregationGroupByOrderByPlanNode = + new AggregationGroupByOrderByPlanNode(_indexSegment, queryContext, MAX_INITIAL_RESULT_HOLDER_CAPACITY, + NUM_GROUPS_LIMIT, trimSize); + + // Get the query executor + AggregationGroupByOrderByOperator aggregationGroupByOrderByOperator = aggregationGroupByOrderByPlanNode.run(); + + // Extract the execution result + IntermediateResultsBlock resultsBlock = aggregationGroupByOrderByOperator.nextBlock(); + ArrayList<Pair<Double, Double>> extractedResult = extractTestResult(resultsBlock); + + assertEquals(extractedResult, expectedResult); + } + + /** + * Helper method to setup the index segment on which to perform aggregation tests. + * - Generates a segment with {@link #NUM_COLUMN} and {@link #NUM_ROWS} + * - Random 'double' data filled in the metric columns. The data is also populated + * into the _inputData[], so it can be used to test the results. + * + * @throws Exception + */ + private void setupSegment() + throws Exception { + if (INDEX_DIR.exists()) { + FileUtils.deleteQuietly(INDEX_DIR); + } + + // Segment Config + SegmentGeneratorConfig config = + new SegmentGeneratorConfig(new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build(), + buildSchema()); + config.setSegmentName(SEGMENT_NAME); + config.setOutDir(INDEX_DIR.getAbsolutePath()); + + // Fill the data table + List<GenericRow> rows = new ArrayList<>(NUM_ROWS); + int baseValue = 10; + for (int i = 0; i < NUM_ROWS; i++) { + GenericRow genericRow = new GenericRow(); + + for (int j = 0; j < _columns.length; j++) { + String metricName = _columns[j]; + double value = baseValue + i + j; + _inputData[j][i] = value; + genericRow.putValue(metricName, value); + } + // Compute the max result and insert into a grouped map + computeMaxResult(_inputData[0][i], _inputData[1][i]); + rows.add(genericRow); + baseValue += 10; + } + + SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl(); + driver.init(config, new GenericRowRecordReader(rows)); + driver.build(); + + _indexSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, driver.getSegmentName()), ReadMode.heap); + } + + /** + * Helper method to build schema for the segment on which aggregation tests will be run. + * + * @return table schema + */ + private Schema buildSchema() { + Schema schema = new Schema(); + + for (int i = 0; i < NUM_COLUMN; i++) { + String metricName = METRIC_PREFIX + i; + MetricFieldSpec metricFieldSpec = new MetricFieldSpec(metricName, FieldSpec.DataType.DOUBLE); + schema.addField(metricFieldSpec); + _columns[i] = metricName; + } + return schema; + } + + /** + * Helper method to compute the aggregation result grouped by the key + * + */ + private void computeMaxResult(Double key, Double result) { + if (_resultMap.get(key) == null || _resultMap.get(key) < result) { + _resultMap.put(key, result); + } + } + + /** + * Helper method to extract the result from IntermediateResultsBlock + * + * @return A list of expected results + */ + private ArrayList<Pair<Double, Double>> extractTestResult(IntermediateResultsBlock resultsBlock) { + AggregationGroupByResult result = resultsBlock.getAggregationGroupByResult(); + if (result != null) { + // No trim + return extractAggregationResult(result); + } else { + // In case of trim + return extractIntermediateResult(resultsBlock.getIntermediateRecords()); + } + } + + /** + * Helper method to extract the result from AggregationGroupByResult + * + * @return A list of expected results + */ + private ArrayList<Pair<Double, Double>> extractAggregationResult(AggregationGroupByResult aggregationGroupByResult) { + ArrayList<Pair<Double, Double>> result = new ArrayList<>(); + Iterator<GroupKeyGenerator.GroupKey> iterator = aggregationGroupByResult.getGroupKeyIterator(); + int i = 0; + while (iterator.hasNext()) { + GroupKeyGenerator.GroupKey groupKey = iterator.next(); + Double key = (Double) groupKey._keys[0]; + Double value = (Double) aggregationGroupByResult.getResultForGroupId(i, groupKey._groupId); + result.add(new Pair<>(key, value)); + } + result.sort((o1, o2) -> (int) (o2.getSecond() - o1.getSecond())); + return result; + } + + /** + * Helper method to extract the result from Collection<IntermediateRecord> + * + * @return A list of expected results + */ + private ArrayList<Pair<Double, Double>> extractIntermediateResult(Collection<IntermediateRecord> intermediateRecord) { + ArrayList<Pair<Double, Double>> result = new ArrayList<>(); + PriorityQueue<IntermediateRecord> resultPQ = new PriorityQueue<>(intermediateRecord); + while (!resultPQ.isEmpty()) { + IntermediateRecord head = resultPQ.poll(); + result.add(new Pair<>((Double) head._record.getValues()[0], (Double) head._record.getValues()[1])); + } + Collections.reverse(result); + return result; + } + + @DataProvider + public static Object[][] QueryDataProvider() { + List<Object[]> data = new ArrayList<>(); + ArrayList<Pair<Double, Double>> expectedResult = computeExpectedResult(); + // Testcase1: low limit + high trim size + QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL( + "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER BY max(metric_1) DESC LIMIT 1"); + int trimSize = 100; + int expectedSize = max(trimSize, 5 * queryContext.getLimit()); + data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), queryContext}); + // Testcase2: high limit + low trim size + queryContext = QueryContextConverterUtils.getQueryContextFromSQL( + "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER BY max(metric_1) DESC LIMIT 50"); + trimSize = 10; + expectedSize = max(trimSize, 5 * queryContext.getLimit()); + data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), queryContext}); + // Testcase3: high limit + high trim size (No trim) + queryContext = QueryContextConverterUtils.getQueryContextFromSQL( + "SELECT metric_0, max(metric_1) FROM testTable GROUP BY metric_0 ORDER BY max(metric_1) DESC LIMIT 500"); + trimSize = 1000; + expectedSize = 1000; + data.add(new Object[]{trimSize, expectedResult.subList(0, expectedSize), queryContext}); + + return data.toArray(new Object[data.size()][]); + } + + /** + * Helper method to compute the expected result + * + * @return A list of expected results + */ + private static ArrayList<Pair<Double, Double>> computeExpectedResult() { + ArrayList<Pair<Double, Double>> result = new ArrayList<>(); + for (Map.Entry<Double, Double> entry : _resultMap.entrySet()) { + result.add(new Pair<>(entry.getKey(), entry.getValue())); + } + result.sort((o1, o2) -> (int) (o2.getSecond() - o1.getSecond())); + return result; + } +} diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java index c9ce9db..5dfcead 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.List; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; @@ -42,6 +43,19 @@ public class InterSegmentOrderByMultiValueQueriesTest extends BaseMultiValueQuer } /** + * Tests the in-segment build option for GroupBy OrderBy query. (No trim) + */ + @Test(dataProvider = "orderByDataProvider") + public void testGroupByOrderByMVSegmentTrimSQLResults(String query, List<Object[]> expectedResults, + long expectedNumEntriesScannedPostFilter, DataSchema expectedDataSchema) { + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1); + BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, planMaker); + QueriesTestUtils + .testInterSegmentResultTable(brokerResponse, 400000L, 0, expectedNumEntriesScannedPostFilter, 400000L, + expectedResults, expectedResults.size(), expectedDataSchema); + } + + /** * Provides various combinations of order by. * In order to calculate the expected results, the results from a group by were taken, and then ordered accordingly. */ diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java index a10e542..ddd2d11 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java @@ -30,6 +30,7 @@ import org.apache.pinot.common.response.broker.AggregationResult; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.GroupByResult; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request; import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey; import org.testng.Assert; @@ -73,6 +74,21 @@ public class InterSegmentOrderBySingleValueQueriesTest extends BaseSingleValueQu } /** + * Tests the in-segment build option for GroupBy OrderBy query. (No trim) + */ + @Test(dataProvider = "orderBySQLResultTableProvider") + public void testGroupByOrderByMVSegmentTrimSQLResults(String query, List<Object[]> expectedResults, + long expectedNumDocsScanned, long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, + long expectedNumTotalDocs, DataSchema expectedDataSchema) { + InstancePlanMakerImplV2 planMaker = new InstancePlanMakerImplV2(1); + BrokerResponseNative brokerResponse = getBrokerResponseForSqlQuery(query, planMaker); + QueriesTestUtils + .testInterSegmentResultTable(brokerResponse, expectedNumDocsScanned, expectedNumEntriesScannedInFilter, + expectedNumEntriesScannedPostFilter, expectedNumTotalDocs, expectedResults, expectedResults.size(), + expectedDataSchema); + } + + /** * Tests the query options for groupByMode, responseFormat. * pql, pql - does not execute order by, returns aggregationResults * pql, sql - does not execute order by, returns aggregationResults --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org