This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 442c0fc713 Configurable sketch accuracy in merge rollup task (#14373) 442c0fc713 is described below commit 442c0fc71343d8f8ff152a818867f03d240bb758 Author: David Cromberge <davecrombe...@gmail.com> AuthorDate: Tue Dec 10 11:05:46 2024 +0200 Configurable sketch accuracy in merge rollup task (#14373) * Configurable sketch accuracy in merge rollup task * Run mvn spotless:apply --- .../apache/pinot/core/common/MinionConstants.java | 1 + .../DistinctCountCPCSketchAggregator.java | 3 ++- .../aggregator/DistinctCountHLLAggregator.java | 3 ++- .../DistinctCountThetaSketchAggregator.java | 24 ++++++++++++----- .../aggregator/DistinctCountULLAggregator.java | 3 ++- .../aggregator/IntegerTupleSketchAggregator.java | 21 +++++++++++++-- .../processing/aggregator/MaxValueAggregator.java | 3 ++- .../processing/aggregator/MinValueAggregator.java | 3 ++- .../processing/aggregator/SumValueAggregator.java | 3 ++- .../processing/aggregator/ValueAggregator.java | 5 +++- .../framework/SegmentProcessorConfig.java | 23 +++++++++++++++-- .../segment/processing/reducer/ReducerFactory.java | 3 ++- .../segment/processing/reducer/RollupReducer.java | 20 +++++++++++---- .../pinot/plugin/minion/tasks/MergeTaskUtils.java | 22 ++++++++++++++++ .../tasks/mergerollup/MergeRollupTaskExecutor.java | 4 +++ .../tasks/mergerollup/MergeRollupTaskUtils.java | 15 +++++++++++ .../RealtimeToOfflineSegmentsTaskExecutor.java | 4 +++ .../plugin/minion/tasks/MergeTaskUtilsTest.java | 30 +++++++++++++++++----- .../mergerollup/MergeRollupTaskUtilsTest.java | 24 +++++++++++++++++ 19 files changed, 185 insertions(+), 29 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java index 3c4b9b322f..48349099b4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java @@ -103,6 +103,7 @@ public class MinionConstants { // Merge config public static final String MERGE_TYPE_KEY = "mergeType"; public static final String AGGREGATION_TYPE_KEY_SUFFIX = ".aggregationType"; + public static final String AGGREGATION_FUNCTION_PARAMETERS_PREFIX = "aggregationFunctionParameters."; public static final String MODE = "mode"; public static final String PROCESS_FROM_WATERMARK_MODE = "processFromWatermark"; public static final String PROCESS_ALL_MODE = "processAll"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java index 82e9a74161..73985f564d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.datasketches.cpc.CpcSketch; import org.apache.datasketches.cpc.CpcUnion; import org.apache.pinot.core.common.ObjectSerDeUtils; @@ -30,7 +31,7 @@ public class DistinctCountCPCSketchAggregator implements ValueAggregator { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { CpcSketch first = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1); CpcSketch second = ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2); CpcSketch result; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java index 4eecbe3696..940d356a95 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java @@ -20,12 +20,13 @@ package org.apache.pinot.core.segment.processing.aggregator; import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; import com.clearspring.analytics.stream.cardinality.HyperLogLog; +import java.util.Map; import org.apache.pinot.core.common.ObjectSerDeUtils; public class DistinctCountHLLAggregator implements ValueAggregator { @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { try { HyperLogLog first = ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value1); HyperLogLog second = ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value2); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java index b11f7d7b00..f22e38ed3c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java @@ -18,26 +18,38 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.datasketches.theta.Sketch; import org.apache.datasketches.theta.Union; import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.spi.Constants; import org.apache.pinot.spi.utils.CommonConstants; public class DistinctCountThetaSketchAggregator implements ValueAggregator { - private final Union _union; - public DistinctCountThetaSketchAggregator() { - // TODO: Handle configurable nominal entries - _union = Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion(); } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { + String nominalEntriesParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES); + + int sketchNominalEntries; + + // Check if nominal entries values match + if (nominalEntriesParam != null) { + sketchNominalEntries = Integer.parseInt(nominalEntriesParam); + } else { + // If the functionParameters don't have an explicit nominal entries value set, + // use the default value for nominal entries + sketchNominalEntries = CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES; + } + + Union union = Union.builder().setNominalEntries(sketchNominalEntries).buildUnion(); Sketch first = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1); Sketch second = ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2); - Sketch result = _union.union(first, second); + Sketch result = union.union(first, second); return ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(result); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java index 2a51ac052b..70469f8cf4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java @@ -19,12 +19,13 @@ package org.apache.pinot.core.segment.processing.aggregator; import com.dynatrace.hash4j.distinctcount.UltraLogLog; +import java.util.Map; import org.apache.pinot.core.common.ObjectSerDeUtils; public class DistinctCountULLAggregator implements ValueAggregator { @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { UltraLogLog first = ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value1); UltraLogLog second = ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value2); // add to the one with a larger P and return that diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java index 8bdf7f8a86..b7df4c05fe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java @@ -18,11 +18,14 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.datasketches.tuple.Sketch; import org.apache.datasketches.tuple.Union; import org.apache.datasketches.tuple.aninteger.IntegerSummary; import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations; import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.segment.spi.Constants; +import org.apache.pinot.spi.utils.CommonConstants; public class IntegerTupleSketchAggregator implements ValueAggregator { @@ -33,10 +36,24 @@ public class IntegerTupleSketchAggregator implements ValueAggregator { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { + String nominalEntriesParam = functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES); + + int sketchNominalEntries; + + // Check if nominal entries values match + if (nominalEntriesParam != null) { + sketchNominalEntries = Integer.parseInt(nominalEntriesParam); + } else { + // If the functionParameters don't have an explicit nominal entries value set, + // use the default value for nominal entries + sketchNominalEntries = (int) Math.pow(2, CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK); + } + Sketch<IntegerSummary> first = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1); Sketch<IntegerSummary> second = ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2); - Sketch<IntegerSummary> result = new Union<>(new IntegerSummarySetOperations(_mode, _mode)).union(first, second); + Sketch<IntegerSummary> result = + new Union<>(sketchNominalEntries, new IntegerSummarySetOperations(_mode, _mode)).union(first, second); return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java index 6a231b036c..1c4fa5a498 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; @@ -33,7 +34,7 @@ public class MaxValueAggregator implements ValueAggregator { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { Object result; switch (_dataType) { case INT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java index 9352cc99d0..8914dfa7c8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; @@ -33,7 +34,7 @@ public class MinValueAggregator implements ValueAggregator { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { Object result; switch (_dataType) { case INT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java index 0570cca1b5..8b7d57d889 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; import org.apache.pinot.spi.data.FieldSpec; @@ -33,7 +34,7 @@ public class SumValueAggregator implements ValueAggregator { } @Override - public Object aggregate(Object value1, Object value2) { + public Object aggregate(Object value1, Object value2, Map<String, String> functionParameters) { Object result; switch (_dataType) { case INT: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java index 016e0fb091..70d90dd100 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java @@ -18,6 +18,9 @@ */ package org.apache.pinot.core.segment.processing.aggregator; +import java.util.Map; + + /** * Interface for value aggregator */ @@ -27,5 +30,5 @@ public interface ValueAggregator { * Given two values, return the aggregated value * @return aggregated value given two column values */ - Object aggregate(Object value1, Object value2); + Object aggregate(Object value1, Object value2, Map<String, String> functionParameters); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java index 053f78b6f3..56009608ee 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java @@ -44,12 +44,14 @@ public class SegmentProcessorConfig { private final List<PartitionerConfig> _partitionerConfigs; private final MergeType _mergeType; private final Map<String, AggregationFunctionType> _aggregationTypes; + private final Map<String, Map<String, String>> _aggregationFunctionParameters; private final SegmentConfig _segmentConfig; private final Consumer<Object> _progressObserver; private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, TimeHandlerConfig timeHandlerConfig, List<PartitionerConfig> partitionerConfigs, MergeType mergeType, - Map<String, AggregationFunctionType> aggregationTypes, SegmentConfig segmentConfig, + Map<String, AggregationFunctionType> aggregationTypes, + Map<String, Map<String, String>> aggregationFunctionParameters, SegmentConfig segmentConfig, Consumer<Object> progressObserver) { TimestampIndexUtils.applyTimestampIndex(tableConfig, schema); _tableConfig = tableConfig; @@ -58,6 +60,7 @@ public class SegmentProcessorConfig { _partitionerConfigs = partitionerConfigs; _mergeType = mergeType; _aggregationTypes = aggregationTypes; + _aggregationFunctionParameters = aggregationFunctionParameters; _segmentConfig = segmentConfig; _progressObserver = (progressObserver != null) ? progressObserver : p -> { // Do nothing. @@ -106,6 +109,13 @@ public class SegmentProcessorConfig { return _aggregationTypes; } + /** + * The aggregation function parameters for the SegmentProcessorFramework's reduce phase with ROLLUP merge type + */ + public Map<String, Map<String, String>> getAggregationFunctionParameters() { + return _aggregationFunctionParameters; + } + /** * The SegmentConfig for the SegmentProcessorFramework's reduce phase */ @@ -134,6 +144,7 @@ public class SegmentProcessorConfig { private List<PartitionerConfig> _partitionerConfigs; private MergeType _mergeType; private Map<String, AggregationFunctionType> _aggregationTypes; + private Map<String, Map<String, String>> _aggregationFunctionParameters; private SegmentConfig _segmentConfig; private Consumer<Object> _progressObserver; @@ -167,6 +178,11 @@ public class SegmentProcessorConfig { return this; } + public Builder setAggregationFunctionParameters(Map<String, Map<String, String>> aggregationFunctionParameters) { + _aggregationFunctionParameters = aggregationFunctionParameters; + return this; + } + public Builder setSegmentConfig(SegmentConfig segmentConfig) { _segmentConfig = segmentConfig; return this; @@ -193,11 +209,14 @@ public class SegmentProcessorConfig { if (_aggregationTypes == null) { _aggregationTypes = Collections.emptyMap(); } + if (_aggregationFunctionParameters == null) { + _aggregationFunctionParameters = Collections.emptyMap(); + } if (_segmentConfig == null) { _segmentConfig = new SegmentConfig.Builder().build(); } return new SegmentProcessorConfig(_tableConfig, _schema, _timeHandlerConfig, _partitionerConfigs, _mergeType, - _aggregationTypes, _segmentConfig, _progressObserver); + _aggregationTypes, _aggregationFunctionParameters, _segmentConfig, _progressObserver); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java index a205500e34..59fca478fa 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java @@ -38,7 +38,8 @@ public class ReducerFactory { case CONCAT: return new ConcatReducer(fileManager); case ROLLUP: - return new RollupReducer(partitionId, fileManager, processorConfig.getAggregationTypes(), reducerOutputDir); + return new RollupReducer(partitionId, fileManager, processorConfig.getAggregationTypes(), + processorConfig.getAggregationFunctionParameters(), reducerOutputDir); case DEDUP: return new DedupReducer(partitionId, fileManager, reducerOutputDir); default: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java index ae88120f20..fdd1a67173 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.segment.processing.reducer; import java.io.File; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import org.apache.commons.io.FileUtils; @@ -47,14 +48,17 @@ public class RollupReducer implements Reducer { private final String _partitionId; private final GenericRowFileManager _fileManager; private final Map<String, AggregationFunctionType> _aggregationTypes; + private final Map<String, Map<String, String>> _aggregationFunctionParameters; private final File _reducerOutputDir; private GenericRowFileManager _rollupFileManager; public RollupReducer(String partitionId, GenericRowFileManager fileManager, - Map<String, AggregationFunctionType> aggregationTypes, File reducerOutputDir) { + Map<String, AggregationFunctionType> aggregationTypes, + Map<String, Map<String, String>> aggregationFunctionParameters, File reducerOutputDir) { _partitionId = partitionId; _fileManager = fileManager; _aggregationTypes = aggregationTypes; + _aggregationFunctionParameters = aggregationFunctionParameters; _reducerOutputDir = reducerOutputDir; } @@ -91,7 +95,8 @@ public class RollupReducer implements Reducer { for (FieldSpec fieldSpec : fieldSpecs) { if (fieldSpec.getFieldType() == FieldType.METRIC) { aggregatorContextList.add(new AggregatorContext(fieldSpec, - _aggregationTypes.getOrDefault(fieldSpec.getName(), DEFAULT_AGGREGATOR_TYPE))); + _aggregationTypes.getOrDefault(fieldSpec.getName(), DEFAULT_AGGREGATOR_TYPE), + _aggregationFunctionParameters.getOrDefault(fieldSpec.getName(), Collections.emptyMap()))); } } @@ -159,7 +164,8 @@ public class RollupReducer implements Reducer { } else { // Non-null field, aggregate the value aggregatedRow.putValue(column, - aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column))); + aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column), + aggregatorContext._functionParameters)); } } } @@ -169,17 +175,21 @@ public class RollupReducer implements Reducer { for (AggregatorContext aggregatorContext : aggregatorContextList) { String column = aggregatorContext._column; aggregatedRow.putValue(column, - aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column))); + aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), rowToAggregate.getValue(column), + aggregatorContext._functionParameters)); } } private static class AggregatorContext { final String _column; final ValueAggregator _aggregator; + final Map<String, String> _functionParameters; - AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType aggregationType) { + AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType aggregationType, + Map<String, String> functionParameters) { _column = fieldSpec.getName(); _aggregator = ValueAggregatorFactory.getValueAggregator(aggregationType, fieldSpec.getDataType()); + _functionParameters = functionParameters; } } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java index 34bf2e5ecf..43f951629b 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java @@ -137,6 +137,28 @@ public class MergeTaskUtils { return aggregationTypes; } + /** + * Returns a map from column name to the aggregation function parameters associated with it based on the task config. + */ + public static Map<String, Map<String, String>> getAggregationFunctionParameters(Map<String, String> taskConfig) { + Map<String, Map<String, String>> aggregationFunctionParameters = new HashMap<>(); + String prefix = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX; + + for (Map.Entry<String, String> entry : taskConfig.entrySet()) { + String key = entry.getKey(); + String value = entry.getValue(); + if (key.startsWith(prefix)) { + String[] parts = key.substring(prefix.length()).split("\\.", 2); + if (parts.length == 2) { + String metricColumn = parts[0]; + String paramName = parts[1]; + aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new HashMap<>()).put(paramName, value); + } + } + } + return aggregationFunctionParameters; + } + /** * Returns the segment config based on the task config. */ diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java index bebedb3ff2..455849f648 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java @@ -91,6 +91,10 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu // Aggregation types segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs)); + // Aggregation function parameters + segmentProcessorConfigBuilder.setAggregationFunctionParameters( + MergeTaskUtils.getAggregationFunctionParameters(configs)); + // Segment config segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java index c3c7720e1b..b040b54d53 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java @@ -23,6 +23,8 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.pinot.core.common.MinionConstants; import org.apache.pinot.core.common.MinionConstants.MergeTask; @@ -51,12 +53,25 @@ public class MergeRollupTaskUtils { */ public static Map<String, Map<String, String>> getLevelToConfigMap(Map<String, String> taskConfig) { Map<String, Map<String, String>> levelToConfigMap = new TreeMap<>(); + + // Regex to match aggregation function parameter keys + Pattern pattern = Pattern.compile("(\\w+)\\.aggregationFunctionParameters\\.(\\w+)\\.(\\w+)"); + for (Map.Entry<String, String> entry : taskConfig.entrySet()) { String key = entry.getKey(); for (String configKey : VALID_CONFIG_KEYS) { if (key.endsWith(configKey)) { String level = key.substring(0, key.length() - configKey.length() - 1); levelToConfigMap.computeIfAbsent(level, k -> new TreeMap<>()).put(configKey, entry.getValue()); + } else { + Matcher matcher = pattern.matcher(key); + if (matcher.matches()) { + String level = matcher.group(1).trim(); // e.g., "1day" or "1hour" + String metric = matcher.group(2).trim(); // e.g., "metricColumnA" or "metricColumnB" + String param = matcher.group(3).trim(); // e.g., "nominalEntries" or "p" + String metricParam = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + metric + "." + param; + levelToConfigMap.computeIfAbsent(level, k -> new TreeMap<>()).put(metricParam, entry.getValue()); + } } } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java index 502fa1cc76..bb1fc70afa 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java @@ -149,6 +149,10 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC // Aggregation types segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs)); + // Aggregation function parameters + segmentProcessorConfigBuilder.setAggregationFunctionParameters( + MergeTaskUtils.getAggregationFunctionParameters(configs)); + // Segment config segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs)); diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java index 731607784b..c60b86899e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java @@ -49,8 +49,9 @@ public class MergeTaskUtilsTest { public void testGetTimeHandlerConfig() { TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build(); - Schema schema = new Schema.SchemaBuilder() - .addDateTime("dateTime", DataType.LONG, "1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", "1:SECONDS").build(); + Schema schema = + new Schema.SchemaBuilder().addDateTime("dateTime", DataType.LONG, "1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", + "1:SECONDS").build(); Map<String, String> taskConfig = new HashMap<>(); long expectedWindowStartMs = 1625097600000L; long expectedWindowEndMs = 1625184000000L; @@ -171,6 +172,23 @@ public class MergeTaskUtilsTest { } } + @Test + public void testGetAggregationFunctionParameters() { + Map<String, String> taskConfig = new HashMap<>(); + taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param1", "value1"); + taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnA.param2", "value2"); + taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + "metricColumnB.param1", "value3"); + taskConfig.put("otherPrefix.metricColumnC.param1", "value1"); + taskConfig.put("aggregationFunction.metricColumnD.param2", "value2"); + Map<String, Map<String, String>> result = MergeTaskUtils.getAggregationFunctionParameters(taskConfig); + assertEquals(result.size(), 2); + assertTrue(result.containsKey("metricColumnA")); + assertTrue(result.containsKey("metricColumnB")); + assertEquals(result.get("metricColumnA").get("param1"), "value1"); + assertEquals(result.get("metricColumnA").get("param2"), "value2"); + assertEquals(result.get("metricColumnB").get("param1"), "value3"); + } + @Test public void testGetSegmentConfig() { Map<String, String> taskConfig = new HashMap<>(); @@ -206,12 +224,12 @@ public class MergeTaskUtilsTest { segmentZKMetadata.setCustomMap(Collections.emptyMap()); assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata)); - segmentZKMetadata - .setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "false")); + segmentZKMetadata.setCustomMap( + Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "false")); assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata)); - segmentZKMetadata - .setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "true")); + segmentZKMetadata.setCustomMap( + Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, "true")); assertFalse(MergeTaskUtils.allowMerge(segmentZKMetadata)); } } diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java index f73559728f..90827e2fa9 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java @@ -98,4 +98,28 @@ public class MergeRollupTaskUtilsTest { assertTrue(result.contains("dimension2"), "Expected set to contain 'dimension2'"); assertTrue(result.contains("dimension3"), "Expected set to contain 'dimension3'"); } + + @Test + public void testAggregationFunctionParameters() { + Map<String, String> taskConfig = new HashMap<>(); + taskConfig.put("hourly.aggregationFunctionParameters.metricColumnA.nominalEntries", "16384"); + taskConfig.put("hourly.aggregationFunctionParameters.metricColumnB.nominalEntries", "8192"); + taskConfig.put("daily.aggregationFunctionParameters.metricColumnA.nominalEntries", "8192"); + taskConfig.put("daily.aggregationFunctionParameters.metricColumnB.nominalEntries", "4096"); + + Map<String, Map<String, String>> levelToConfigMap = MergeRollupTaskUtils.getLevelToConfigMap(taskConfig); + assertEquals(levelToConfigMap.size(), 2); + + Map<String, String> hourlyConfig = levelToConfigMap.get("hourly"); + assertNotNull(hourlyConfig); + assertEquals(hourlyConfig.size(), 2); + assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"), "16384"); + assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"), "8192"); + + Map<String, String> dailyConfig = levelToConfigMap.get("daily"); + assertNotNull(dailyConfig); + assertEquals(dailyConfig.size(), 2); + assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"), "8192"); + assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"), "4096"); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org