This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new a8fbdae Optimize DistinctCount to store dictIds within segment (#5765) a8fbdae is described below commit a8fbdaeffaf99c1df4258f60ecbbab1f553d7696 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Jul 29 00:34:11 2020 -0700 Optimize DistinctCount to store dictIds within segment (#5765) For DistinctCount aggregation function, we can store dictIds within segment without fetching the values, and read the values from dictionary before returning the result for the segment. --- .../org/apache/pinot/core/common/BlockValSet.java | 8 + .../apache/pinot/core/common/DataBlockCache.java | 11 +- .../pinot/core/operator/ProjectionOperator.java | 9 +- .../core/operator/blocks/ProjectionBlock.java | 39 ++--- .../pinot/core/operator/blocks/TransformBlock.java | 4 - .../operator/docvalsets/ProjectionBlockValSet.java | 28 ++-- .../operator/docvalsets/TransformBlockValSet.java | 8 + .../function/DistinctCountAggregationFunction.java | 165 +++++++++++++++------ .../DistinctCountMVAggregationFunction.java | 53 ++++++- 9 files changed, 230 insertions(+), 95 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java index 2a0f875..e7574a7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/BlockValSet.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.core.common; +import javax.annotation.Nullable; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -38,6 +40,12 @@ public interface BlockValSet { boolean isSingleValue(); /** + * Returns the dictionary for the column, or {@code null} if the column is not dictionary-encoded. + */ + @Nullable + Dictionary getDictionary(); + + /** * SINGLE-VALUED COLUMN APIs */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java index fbaeeed..141ecfb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataBlockCache.java @@ -23,9 +23,9 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import javax.annotation.Nonnull; +import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.utils.EqualityUtils; -import org.apache.pinot.core.plan.DocIdSetPlanNode; /** @@ -71,6 +71,15 @@ public class DataBlockCache { } /** + * Returns the number of documents within the current block. + * + * @return Number of documents within the current block + */ + public int getNumDocs() { + return _length; + } + + /** * SINGLE-VALUED COLUMN API */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java index 32ef7c8..d525ce7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java @@ -18,13 +18,11 @@ */ package org.apache.pinot.core.operator; -import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.core.common.DataBlockCache; import org.apache.pinot.core.common.DataFetcher; import org.apache.pinot.core.common.DataSource; -import org.apache.pinot.core.common.DataSourceMetadata; import org.apache.pinot.core.operator.blocks.DocIdSetBlock; import org.apache.pinot.core.operator.blocks.ProjectionBlock; @@ -33,17 +31,12 @@ public class ProjectionOperator extends BaseOperator<ProjectionBlock> { private static final String OPERATOR_NAME = "ProjectionOperator"; private final Map<String, DataSource> _dataSourceMap; - private final Map<String, DataSourceMetadata> _dataSourceMetadataMap; private final BaseOperator<DocIdSetBlock> _docIdSetOperator; private final DataBlockCache _dataBlockCache; public ProjectionOperator(Map<String, DataSource> dataSourceMap, @Nullable BaseOperator<DocIdSetBlock> docIdSetOperator) { _dataSourceMap = dataSourceMap; - _dataSourceMetadataMap = new HashMap<>(dataSourceMap.size()); - for (Map.Entry<String, DataSource> entry : dataSourceMap.entrySet()) { - _dataSourceMetadataMap.put(entry.getKey(), entry.getValue().getDataSourceMetadata()); - } _docIdSetOperator = docIdSetOperator; _dataBlockCache = new DataBlockCache(new DataFetcher(dataSourceMap)); } @@ -66,7 +59,7 @@ public class ProjectionOperator extends BaseOperator<ProjectionBlock> { return null; } else { _dataBlockCache.initNewBlock(docIdSetBlock.getDocIdSet(), docIdSetBlock.getSearchableLength()); - return new ProjectionBlock(_dataSourceMetadataMap, _dataBlockCache, docIdSetBlock); + return new ProjectionBlock(_dataSourceMap, _dataBlockCache); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java index c3b8774..989a870 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/ProjectionBlock.java @@ -25,9 +25,8 @@ import org.apache.pinot.core.common.BlockDocIdValueSet; import org.apache.pinot.core.common.BlockMetadata; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.DataBlockCache; -import org.apache.pinot.core.common.DataSourceMetadata; +import org.apache.pinot.core.common.DataSource; import org.apache.pinot.core.operator.docvalsets.ProjectionBlockValSet; -import org.apache.pinot.spi.data.FieldSpec; /** @@ -35,24 +34,20 @@ import org.apache.pinot.spi.data.FieldSpec; * It provides DocIdSetBlock for a given column. */ public class ProjectionBlock implements Block { - private final Map<String, DataSourceMetadata> _dataSourceMetadataMap; - private final DocIdSetBlock _docIdSetBlock; + private final Map<String, DataSource> _dataSourceMap; private final DataBlockCache _dataBlockCache; - public ProjectionBlock(Map<String, DataSourceMetadata> dataSourceMetadataMap, DataBlockCache dataBlockCache, DocIdSetBlock docIdSetBlock) { - _dataSourceMetadataMap = dataSourceMetadataMap; - _docIdSetBlock = docIdSetBlock; + public ProjectionBlock(Map<String, DataSource> dataSourceMap, DataBlockCache dataBlockCache) { + _dataSourceMap = dataSourceMap; _dataBlockCache = dataBlockCache; } - @Override - public BlockValSet getBlockValueSet() { - throw new UnsupportedOperationException(); + public int getNumDocs() { + return _dataBlockCache.getNumDocs(); } - @Override - public BlockDocIdValueSet getBlockDocIdValueSet() { - throw new UnsupportedOperationException(); + public BlockValSet getBlockValueSet(String column) { + return new ProjectionBlockValSet(_dataBlockCache, column, _dataSourceMap.get(column)); } @Override @@ -61,21 +56,17 @@ public class ProjectionBlock implements Block { } @Override - public BlockMetadata getMetadata() { + public BlockValSet getBlockValueSet() { throw new UnsupportedOperationException(); } - public BlockValSet getBlockValueSet(String column) { - FieldSpec fieldSpec = _dataSourceMetadataMap.get(column).getFieldSpec(); - return new ProjectionBlockValSet(_dataBlockCache, column, fieldSpec.getDataType(), - fieldSpec.isSingleValueField()); - } - - public DocIdSetBlock getDocIdSetBlock() { - return _docIdSetBlock; + @Override + public BlockDocIdValueSet getBlockDocIdValueSet() { + throw new UnsupportedOperationException(); } - public int getNumDocs() { - return _docIdSetBlock.getSearchableLength(); + @Override + public BlockMetadata getMetadata() { + throw new UnsupportedOperationException(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java index 7d78e07..63f57b6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/TransformBlock.java @@ -78,8 +78,4 @@ public class TransformBlock implements Block { public BlockMetadata getMetadata() { throw new UnsupportedOperationException(); } - - public DocIdSetBlock getDocIdSetBlock() { - return _projectionBlock.getDocIdSetBlock(); - } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java index efdb41b..c355131 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java @@ -18,9 +18,12 @@ */ package org.apache.pinot.core.operator.docvalsets; +import javax.annotation.Nullable; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.common.DataBlockCache; +import org.apache.pinot.core.common.DataSource; import org.apache.pinot.core.operator.ProjectionOperator; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.spi.data.FieldSpec.DataType; @@ -32,32 +35,33 @@ import org.apache.pinot.spi.data.FieldSpec.DataType; public class ProjectionBlockValSet implements BlockValSet { private final DataBlockCache _dataBlockCache; private final String _column; - private final DataType _dataType; - private final boolean _singleValue; + private final DataSource _dataSource; /** * Constructor for the class. - * The dataBlockCache argument is initialized in {@link ProjectionOperator}, - * so that it can be reused across multiple calls to {@link ProjectionOperator#nextBlock()}. - * - * @param dataBlockCache data block cache - * @param column Projection column. + * The dataBlockCache is initialized in {@link ProjectionOperator} so that it can be reused across multiple calls to + * {@link ProjectionOperator#nextBlock()}. */ - public ProjectionBlockValSet(DataBlockCache dataBlockCache, String column, DataType dataType, boolean singleValue) { + public ProjectionBlockValSet(DataBlockCache dataBlockCache, String column, DataSource dataSource) { _dataBlockCache = dataBlockCache; _column = column; - _dataType = dataType; - _singleValue = singleValue; + _dataSource = dataSource; } @Override public DataType getValueType() { - return _dataType; + return _dataSource.getDataSourceMetadata().getDataType(); } @Override public boolean isSingleValue() { - return _singleValue; + return _dataSource.getDataSourceMetadata().isSingleValue(); + } + + @Nullable + @Override + public Dictionary getDictionary() { + return _dataSource.getDictionary(); } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java index 78fa38a..fd82adb 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java @@ -18,11 +18,13 @@ */ package org.apache.pinot.core.operator.docvalsets; +import javax.annotation.Nullable; import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.operator.blocks.ProjectionBlock; import org.apache.pinot.core.operator.transform.TransformResultMetadata; import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.spi.data.FieldSpec; @@ -52,6 +54,12 @@ public class TransformBlockValSet implements BlockValSet { return _transformFunction.getResultMetadata().isSingleValue(); } + @Nullable + @Override + public Dictionary getDictionary() { + return _transformFunction.getDictionary(); + } + @Override public int[] getDictionaryIdsSV() { return _transformFunction.transformToDictIdsSV(_projectionBlock); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java index 7e7ba4b..c13c8c0 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountAggregationFunction.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.query.aggregation.function; +import it.unimi.dsi.fastutil.ints.IntIterator; import it.unimi.dsi.fastutil.ints.IntOpenHashSet; import java.util.Arrays; import java.util.Map; @@ -29,10 +30,12 @@ import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder; import org.apache.pinot.core.query.request.context.ExpressionContext; -import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.core.segment.index.readers.Dictionary; +import org.apache.pinot.spi.data.FieldSpec.DataType; public class DistinctCountAggregationFunction extends BaseSingleInputAggregationFunction<IntOpenHashSet, Integer> { + protected Dictionary _dictionary; public DistinctCountAggregationFunction(ExpressionContext expression) { super(expression); @@ -64,7 +67,19 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation BlockValSet blockValSet = blockValSetMap.get(_expression); IntOpenHashSet valueSet = getValueSet(aggregationResultHolder); - FieldSpec.DataType valueType = blockValSet.getValueType(); + // For dictionary-encoded expression, store dictionary ids into the value set + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + _dictionary = dictionary; + int[] dictIds = blockValSet.getDictionaryIdsSV(); + for (int i = 0; i < length; i++) { + valueSet.add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store hash code of the values into the value set + DataType valueType = blockValSet.getValueType(); switch (valueType) { case INT: int[] intValues = blockValSet.getIntValuesSV(); @@ -111,43 +126,55 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { BlockValSet blockValSet = blockValSetMap.get(_expression); - FieldSpec.DataType valueType = blockValSet.getValueType(); + // For dictionary-encoded expression, store dictionary ids into the value set + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + _dictionary = dictionary; + int[] dictIds = blockValSet.getDictionaryIdsSV(); + for (int i = 0; i < length; i++) { + getValueSet(groupByResultHolder, groupKeyArray[i]).add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store hash code of the values into the value set + DataType valueType = blockValSet.getValueType(); switch (valueType) { case INT: int[] intValues = blockValSet.getIntValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKey(groupByResultHolder, groupKeyArray[i], intValues[i]); + getValueSet(groupByResultHolder, groupKeyArray[i]).add(intValues[i]); } break; case LONG: long[] longValues = blockValSet.getLongValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Long.hashCode(longValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i]).add(Long.hashCode(longValues[i])); } break; case FLOAT: float[] floatValues = blockValSet.getFloatValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Float.hashCode(floatValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i]).add(Float.hashCode(floatValues[i])); } break; case DOUBLE: double[] doubleValues = blockValSet.getDoubleValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Double.hashCode(doubleValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i]).add(Double.hashCode(doubleValues[i])); } break; case STRING: String[] stringValues = blockValSet.getStringValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKey(groupByResultHolder, groupKeyArray[i], stringValues[i].hashCode()); + getValueSet(groupByResultHolder, groupKeyArray[i]).add(stringValues[i].hashCode()); } break; case BYTES: byte[][] bytesValues = blockValSet.getBytesValuesSV(); for (int i = 0; i < length; i++) { - setValueForGroupKey(groupByResultHolder, groupKeyArray[i], Arrays.hashCode(bytesValues[i])); + getValueSet(groupByResultHolder, groupKeyArray[i]).add(Arrays.hashCode(bytesValues[i])); } break; default: @@ -160,7 +187,19 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation Map<ExpressionContext, BlockValSet> blockValSetMap) { BlockValSet blockValSet = blockValSetMap.get(_expression); - FieldSpec.DataType valueType = blockValSet.getValueType(); + // For dictionary-encoded expression, store dictionary ids into the value set + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + _dictionary = dictionary; + int[] dictIds = blockValSet.getDictionaryIdsSV(); + for (int i = 0; i < length; i++) { + setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store hash code of the values into the value set + DataType valueType = blockValSet.getValueType(); switch (valueType) { case INT: int[] intValues = blockValSet.getIntValuesSV(); @@ -192,6 +231,12 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], stringValues[i].hashCode()); } break; + case BYTES: + byte[][] bytesValues = blockValSet.getBytesValuesSV(); + for (int i = 0; i < length; i++) { + setValueForGroupKeys(groupByResultHolder, groupKeysArray[i], Arrays.hashCode(bytesValues[i])); + } + break; default: throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType); } @@ -202,7 +247,13 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation IntOpenHashSet valueSet = aggregationResultHolder.getResult(); if (valueSet == null) { return new IntOpenHashSet(); + } + + if (_dictionary != null) { + // For dictionary-encoded expression, convert dictionary ids to hash code of the values + return convertToValueSet(valueSet, _dictionary); } else { + // For non-dictionary-encoded expression, directly return the value set return valueSet; } } @@ -212,7 +263,13 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey); if (valueSet == null) { return new IntOpenHashSet(); + } + + if (_dictionary != null) { + // For dictionary-encoded expression, convert dictionary ids to hash code of the values + return convertToValueSet(valueSet, _dictionary); } else { + // For non-dictionary-encoded expression, directly return the value set return valueSet; } } @@ -244,35 +301,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation } /** - * Helper method to set value for a groupKey into the result holder. - * - * @param groupByResultHolder Result holder - * @param groupKey Group-key for which to set the value - * @param value Value for the group key - */ - private void setValueForGroupKey(GroupByResultHolder groupByResultHolder, int groupKey, int value) { - IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey); - valueSet.add(value); - } - - /** - * Helper method to set values for a given array of groupKeys, into the result holder. - * - * @param groupByResultHolder Result holder - * @param groupKeys Array of group keys for which to set the value - * @param value Value to be set - */ - private void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) { - for (int groupKey : groupKeys) { - setValueForGroupKey(groupByResultHolder, groupKey, value); - } - } - - /** * Returns the value set from the result holder or creates a new one if it does not exist. - * - * @param aggregationResultHolder Result holder - * @return Value set from the result holder */ protected static IntOpenHashSet getValueSet(AggregationResultHolder aggregationResultHolder) { IntOpenHashSet valueSet = aggregationResultHolder.getResult(); @@ -284,11 +313,7 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation } /** - * Returns the value set for the given group key. If one does not exist, creates a new one and returns that. - * - * @param groupByResultHolder Result holder - * @param groupKey Group key for which to return the value set - * @return Value set for the group key + * Returns the value set for the given group key or creates a new one if it does not exist. */ protected static IntOpenHashSet getValueSet(GroupByResultHolder groupByResultHolder, int groupKey) { IntOpenHashSet valueSet = groupByResultHolder.getResult(groupKey); @@ -298,4 +323,58 @@ public class DistinctCountAggregationFunction extends BaseSingleInputAggregation } return valueSet; } + + /** + * Helper method to set value for the given group keys into the result holder. + */ + private static void setValueForGroupKeys(GroupByResultHolder groupByResultHolder, int[] groupKeys, int value) { + for (int groupKey : groupKeys) { + getValueSet(groupByResultHolder, groupKey).add(value); + } + } + + /** + * Helper method to read dictionary and convert dictionary ids to hash code of the values for dictionary-encoded + * expression. + */ + private static IntOpenHashSet convertToValueSet(IntOpenHashSet dictIdSet, Dictionary dictionary) { + IntOpenHashSet valueSet = new IntOpenHashSet(dictIdSet.size()); + IntIterator iterator = dictIdSet.iterator(); + DataType valueType = dictionary.getValueType(); + switch (valueType) { + case INT: + while (iterator.hasNext()) { + valueSet.add(dictionary.getIntValue(iterator.nextInt())); + } + break; + case LONG: + while (iterator.hasNext()) { + valueSet.add(Long.hashCode(dictionary.getLongValue(iterator.nextInt()))); + } + break; + case FLOAT: + while (iterator.hasNext()) { + valueSet.add(Float.hashCode(dictionary.getFloatValue(iterator.nextInt()))); + } + break; + case DOUBLE: + while (iterator.hasNext()) { + valueSet.add(Double.hashCode(dictionary.getDoubleValue(iterator.nextInt()))); + } + break; + case STRING: + while (iterator.hasNext()) { + valueSet.add(dictionary.getStringValue(iterator.nextInt()).hashCode()); + } + break; + case BYTES: + while (iterator.hasNext()) { + valueSet.add(Arrays.hashCode(dictionary.getBytesValue(iterator.nextInt()))); + } + break; + default: + throw new IllegalStateException("Illegal data type for DISTINCT_COUNT aggregation function: " + valueType); + } + return valueSet; + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java index 4dba11e..7d8d5d4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctCountMVAggregationFunction.java @@ -25,6 +25,7 @@ import org.apache.pinot.core.common.BlockValSet; import org.apache.pinot.core.query.aggregation.AggregationResultHolder; import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder; import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.segment.index.readers.Dictionary; import org.apache.pinot.spi.data.FieldSpec; @@ -47,9 +48,23 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation @Override public void aggregate(int length, AggregationResultHolder aggregationResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); IntOpenHashSet valueSet = getValueSet(aggregationResultHolder); - BlockValSet blockValSet = blockValSetMap.get(_expression); + // For dictionary-encoded expression, store dictionary ids into the value set + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + _dictionary = dictionary; + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + for (int dictId : dictIds[i]) { + valueSet.add(dictId); + } + } + return; + } + + // For non-dictionary-encoded expression, store hash code of the values into the value set FieldSpec.DataType valueType = blockValSet.getValueType(); switch (valueType) { case INT: @@ -100,8 +115,23 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { BlockValSet blockValSet = blockValSetMap.get(_expression); - FieldSpec.DataType valueType = blockValSet.getValueType(); + // For dictionary-encoded expression, store dictionary ids into the value set + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + _dictionary = dictionary; + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKeyArray[i]); + for (int dictId : dictIds[i]) { + valueSet.add(dictId); + } + } + return; + } + + // For non-dictionary-encoded expression, store hash code of the values into the value set + FieldSpec.DataType valueType = blockValSet.getValueType(); switch (valueType) { case INT: int[][] intValues = blockValSet.getIntValuesMV(); @@ -157,8 +187,25 @@ public class DistinctCountMVAggregationFunction extends DistinctCountAggregation public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, Map<ExpressionContext, BlockValSet> blockValSetMap) { BlockValSet blockValSet = blockValSetMap.get(_expression); - FieldSpec.DataType valueType = blockValSet.getValueType(); + // For dictionary-encoded expression, store dictionary ids into the value set + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + _dictionary = dictionary; + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + IntOpenHashSet valueSet = getValueSet(groupByResultHolder, groupKey); + for (int dictId : dictIds[i]) { + valueSet.add(dictId); + } + } + } + return; + } + + // For non-dictionary-encoded expression, store hash code of the values into the value set + FieldSpec.DataType valueType = blockValSet.getValueType(); switch (valueType) { case INT: int[][] intValues = blockValSet.getIntValuesMV(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org