siddharthteotia commented on code in PR #10128: URL: https://github.com/apache/pinot/pull/10128#discussion_r1073858511
########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java: ########## @@ -259,68 +478,90 @@ public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResult } } - @Override - public Set extractAggregationResult(AggregationResultHolder aggregationResultHolder) { - Object result = aggregationResultHolder.getResult(); - if (result == null) { - // Use empty IntOpenHashSet as a place holder for empty result - return new IntOpenHashSet(); - } - - if (result instanceof DictIdsWrapper) { - // For dictionary-encoded expression, convert dictionary ids to values - return convertToValueSet((DictIdsWrapper) result); - } else { - // For non-dictionary-encoded expression, directly return the value set - return (Set) result; - } - } - - @Override - public Set extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) { - Object result = groupByResultHolder.getResult(groupKey); - if (result == null) { - // NOTE: Return an empty IntOpenHashSet for empty result. - return new IntOpenHashSet(); - } - - if (result instanceof DictIdsWrapper) { - // For dictionary-encoded expression, convert dictionary ids to values - return convertToValueSet((DictIdsWrapper) result); - } else { - // For non-dictionary-encoded expression, directly return the value set - return (Set) result; - } - } + /** + * Performs aggregation for a MV column with group by on a MV column. + */ + protected void mvAggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); - @Override - public Set merge(Set intermediateResult1, Set intermediateResult2) { - if (intermediateResult1.isEmpty()) { - return intermediateResult2; - } - if (intermediateResult2.isEmpty()) { - return intermediateResult1; + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + getDictIdBitmap(groupByResultHolder, groupKey, dictionary).add(dictIds[i]); + } + } + return; } - intermediateResult1.addAll(intermediateResult2); - return intermediateResult1; - } - @Override - public ColumnDataType getIntermediateResultColumnType() { - return ColumnDataType.OBJECT; - } - - /** - * Returns the dictionary id bitmap from the result holder or creates a new one if it does not exist. - */ - protected static RoaringBitmap getDictIdBitmap(AggregationResultHolder aggregationResultHolder, - Dictionary dictionary) { - DictIdsWrapper dictIdsWrapper = aggregationResultHolder.getResult(); - if (dictIdsWrapper == null) { - dictIdsWrapper = new DictIdsWrapper(dictionary); - aggregationResultHolder.setValue(dictIdsWrapper); + // For non-dictionary-encoded expression, store hash code of the values into the value set + DataType storedType = blockValSet.getValueType().getStoredType(); + switch (storedType) { + case INT: + int[][] intValues = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + IntOpenHashSet intSet = (IntOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.INT); + for (int value : intValues[i]) { + intSet.add(value); + } + } + } + break; + case LONG: + long[][] longValues = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + LongOpenHashSet longSet = (LongOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.LONG); + for (long value : longValues[i]) { + longSet.add(value); + } + } + } + break; + case FLOAT: + float[][] floatValues = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + FloatOpenHashSet floatSet = (FloatOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.FLOAT); + for (float value : floatValues[i]) { + floatSet.add(value); + } + } + } + break; + case DOUBLE: + double[][] doubleValues = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + DoubleOpenHashSet doubleSet = + (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKey, DataType.DOUBLE); + for (double value : doubleValues[i]) { + doubleSet.add(value); + } + } + } + break; + case STRING: + String[][] stringValues = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + for (int groupKey : groupKeysArray[i]) { + ObjectOpenHashSet<String> stringSet = + (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKey, DataType.STRING); + //noinspection ManualArrayToCollectionCopy + for (String value : stringValues[i]) { + //noinspection UseBulkOperation + stringSet.add(value); + } + } + } + break; + default: + throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_MV aggregation function: " + storedType); Review Comment: Same here ########## pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/BaseDistinctAggregateAggregationFunction.java: ########## @@ -199,8 +341,85 @@ public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHol } } - @Override - public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder, + /** + * Performs aggregation for a MV column with group by on a SV column. + */ + protected void mvAggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder, + Map<ExpressionContext, BlockValSet> blockValSetMap) { + BlockValSet blockValSet = blockValSetMap.get(_expression); + + // For dictionary-encoded expression, store dictionary ids into the bitmap + Dictionary dictionary = blockValSet.getDictionary(); + if (dictionary != null) { + int[][] dictIds = blockValSet.getDictionaryIdsMV(); + for (int i = 0; i < length; i++) { + getDictIdBitmap(groupByResultHolder, groupKeyArray[i], dictionary).add(dictIds[i]); + } + return; + } + + // For non-dictionary-encoded expression, store values into the value set + DataType storedType = blockValSet.getValueType().getStoredType(); + switch (storedType) { + case INT: + int[][] intValues = blockValSet.getIntValuesMV(); + for (int i = 0; i < length; i++) { + IntOpenHashSet intSet = (IntOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.INT); + for (int value : intValues[i]) { + intSet.add(value); + } + } + break; + case LONG: + long[][] longValues = blockValSet.getLongValuesMV(); + for (int i = 0; i < length; i++) { + LongOpenHashSet longSet = (LongOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.LONG); + for (long value : longValues[i]) { + longSet.add(value); + } + } + break; + case FLOAT: + float[][] floatValues = blockValSet.getFloatValuesMV(); + for (int i = 0; i < length; i++) { + FloatOpenHashSet floatSet = + (FloatOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.FLOAT); + for (float value : floatValues[i]) { + floatSet.add(value); + } + } + break; + case DOUBLE: + double[][] doubleValues = blockValSet.getDoubleValuesMV(); + for (int i = 0; i < length; i++) { + DoubleOpenHashSet doubleSet = + (DoubleOpenHashSet) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.DOUBLE); + for (double value : doubleValues[i]) { + doubleSet.add(value); + } + } + break; + case STRING: + String[][] stringValues = blockValSet.getStringValuesMV(); + for (int i = 0; i < length; i++) { + ObjectOpenHashSet<String> stringSet = + (ObjectOpenHashSet<String>) getValueSet(groupByResultHolder, groupKeyArray[i], DataType.STRING); + //noinspection ManualArrayToCollectionCopy + for (String value : stringValues[i]) { + //noinspection UseBulkOperation + stringSet.add(value); + } + } + break; + default: + throw new IllegalStateException("Illegal data type for DISTINCT_COUNT_MV aggregation function: " + storedType); Review Comment: Same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org