This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 442c0fc713 Configurable sketch accuracy in merge rollup task (#14373)
442c0fc713 is described below

commit 442c0fc71343d8f8ff152a818867f03d240bb758
Author: David Cromberge <davecrombe...@gmail.com>
AuthorDate: Tue Dec 10 11:05:46 2024 +0200

    Configurable sketch accuracy in merge rollup task (#14373)
    
    * Configurable sketch accuracy in merge rollup task
    
    * Run mvn spotless:apply
---
 .../apache/pinot/core/common/MinionConstants.java  |  1 +
 .../DistinctCountCPCSketchAggregator.java          |  3 ++-
 .../aggregator/DistinctCountHLLAggregator.java     |  3 ++-
 .../DistinctCountThetaSketchAggregator.java        | 24 ++++++++++++-----
 .../aggregator/DistinctCountULLAggregator.java     |  3 ++-
 .../aggregator/IntegerTupleSketchAggregator.java   | 21 +++++++++++++--
 .../processing/aggregator/MaxValueAggregator.java  |  3 ++-
 .../processing/aggregator/MinValueAggregator.java  |  3 ++-
 .../processing/aggregator/SumValueAggregator.java  |  3 ++-
 .../processing/aggregator/ValueAggregator.java     |  5 +++-
 .../framework/SegmentProcessorConfig.java          | 23 +++++++++++++++--
 .../segment/processing/reducer/ReducerFactory.java |  3 ++-
 .../segment/processing/reducer/RollupReducer.java  | 20 +++++++++++----
 .../pinot/plugin/minion/tasks/MergeTaskUtils.java  | 22 ++++++++++++++++
 .../tasks/mergerollup/MergeRollupTaskExecutor.java |  4 +++
 .../tasks/mergerollup/MergeRollupTaskUtils.java    | 15 +++++++++++
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  4 +++
 .../plugin/minion/tasks/MergeTaskUtilsTest.java    | 30 +++++++++++++++++-----
 .../mergerollup/MergeRollupTaskUtilsTest.java      | 24 +++++++++++++++++
 19 files changed, 185 insertions(+), 29 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 3c4b9b322f..48349099b4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -103,6 +103,7 @@ public class MinionConstants {
     // Merge config
     public static final String MERGE_TYPE_KEY = "mergeType";
     public static final String AGGREGATION_TYPE_KEY_SUFFIX = 
".aggregationType";
+    public static final String AGGREGATION_FUNCTION_PARAMETERS_PREFIX = 
"aggregationFunctionParameters.";
     public static final String MODE = "mode";
     public static final String PROCESS_FROM_WATERMARK_MODE = 
"processFromWatermark";
     public static final String PROCESS_ALL_MODE = "processAll";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
index 82e9a74161..73985f564d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountCPCSketchAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import java.util.Map;
 import org.apache.datasketches.cpc.CpcSketch;
 import org.apache.datasketches.cpc.CpcUnion;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
@@ -30,7 +31,7 @@ public class DistinctCountCPCSketchAggregator implements 
ValueAggregator {
   }
 
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     CpcSketch first = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value1);
     CpcSketch second = 
ObjectSerDeUtils.DATA_SKETCH_CPC_SER_DE.deserialize((byte[]) value2);
     CpcSketch result;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
index 4eecbe3696..940d356a95 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountHLLAggregator.java
@@ -20,12 +20,13 @@ package org.apache.pinot.core.segment.processing.aggregator;
 
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLog;
+import java.util.Map;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 
 
 public class DistinctCountHLLAggregator implements ValueAggregator {
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     try {
       HyperLogLog first = 
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value1);
       HyperLogLog second = 
ObjectSerDeUtils.HYPER_LOG_LOG_SER_DE.deserialize((byte[]) value2);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
index b11f7d7b00..f22e38ed3c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountThetaSketchAggregator.java
@@ -18,26 +18,38 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import java.util.Map;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.datasketches.theta.Union;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
 import org.apache.pinot.spi.utils.CommonConstants;
 
 
 public class DistinctCountThetaSketchAggregator implements ValueAggregator {
 
-  private final Union _union;
-
   public DistinctCountThetaSketchAggregator() {
-    // TODO: Handle configurable nominal entries
-    _union = 
Union.builder().setNominalEntries(CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES).buildUnion();
   }
 
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
+    String nominalEntriesParam = 
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
+
+    int sketchNominalEntries;
+
+    // Check if nominal entries values match
+    if (nominalEntriesParam != null) {
+      sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+    } else {
+      // If the functionParameters don't have an explicit nominal entries 
value set,
+      // use the default value for nominal entries
+      sketchNominalEntries = 
CommonConstants.Helix.DEFAULT_THETA_SKETCH_NOMINAL_ENTRIES;
+    }
+
+    Union union = 
Union.builder().setNominalEntries(sketchNominalEntries).buildUnion();
     Sketch first = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value1);
     Sketch second = 
ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.deserialize((byte[]) value2);
-    Sketch result = _union.union(first, second);
+    Sketch result = union.union(first, second);
     return ObjectSerDeUtils.DATA_SKETCH_THETA_SER_DE.serialize(result);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
index 2a51ac052b..70469f8cf4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/DistinctCountULLAggregator.java
@@ -19,12 +19,13 @@
 package org.apache.pinot.core.segment.processing.aggregator;
 
 import com.dynatrace.hash4j.distinctcount.UltraLogLog;
+import java.util.Map;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
 
 
 public class DistinctCountULLAggregator implements ValueAggregator {
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     UltraLogLog first = 
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value1);
     UltraLogLog second = 
ObjectSerDeUtils.ULTRA_LOG_LOG_OBJECT_SER_DE.deserialize((byte[]) value2);
     // add to the one with a larger P and return that
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
index 8bdf7f8a86..b7df4c05fe 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/IntegerTupleSketchAggregator.java
@@ -18,11 +18,14 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import java.util.Map;
 import org.apache.datasketches.tuple.Sketch;
 import org.apache.datasketches.tuple.Union;
 import org.apache.datasketches.tuple.aninteger.IntegerSummary;
 import org.apache.datasketches.tuple.aninteger.IntegerSummarySetOperations;
 import org.apache.pinot.core.common.ObjectSerDeUtils;
+import org.apache.pinot.segment.spi.Constants;
+import org.apache.pinot.spi.utils.CommonConstants;
 
 
 public class IntegerTupleSketchAggregator implements ValueAggregator {
@@ -33,10 +36,24 @@ public class IntegerTupleSketchAggregator implements 
ValueAggregator {
   }
 
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
+    String nominalEntriesParam = 
functionParameters.get(Constants.THETA_TUPLE_SKETCH_NOMINAL_ENTRIES);
+
+    int sketchNominalEntries;
+
+    // Check if nominal entries values match
+    if (nominalEntriesParam != null) {
+      sketchNominalEntries = Integer.parseInt(nominalEntriesParam);
+    } else {
+      // If the functionParameters don't have an explicit nominal entries 
value set,
+      // use the default value for nominal entries
+      sketchNominalEntries = (int) Math.pow(2, 
CommonConstants.Helix.DEFAULT_TUPLE_SKETCH_LGK);
+    }
+
     Sketch<IntegerSummary> first = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value1);
     Sketch<IntegerSummary> second = 
ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.deserialize((byte[]) value2);
-    Sketch<IntegerSummary> result = new Union<>(new 
IntegerSummarySetOperations(_mode, _mode)).union(first, second);
+    Sketch<IntegerSummary> result =
+        new Union<>(sketchNominalEntries, new 
IntegerSummarySetOperations(_mode, _mode)).union(first, second);
     return ObjectSerDeUtils.DATA_SKETCH_INT_TUPLE_SER_DE.serialize(result);
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
index 6a231b036c..1c4fa5a498 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MaxValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import java.util.Map;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -33,7 +34,7 @@ public class MaxValueAggregator implements ValueAggregator {
   }
 
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     Object result;
     switch (_dataType) {
       case INT:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
index 9352cc99d0..8914dfa7c8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/MinValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import java.util.Map;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -33,7 +34,7 @@ public class MinValueAggregator implements ValueAggregator {
   }
 
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     Object result;
     switch (_dataType) {
       case INT:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
index 0570cca1b5..8b7d57d889 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/SumValueAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import java.util.Map;
 import org.apache.pinot.spi.data.FieldSpec;
 
 
@@ -33,7 +34,7 @@ public class SumValueAggregator implements ValueAggregator {
   }
 
   @Override
-  public Object aggregate(Object value1, Object value2) {
+  public Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters) {
     Object result;
     switch (_dataType) {
       case INT:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
index 016e0fb091..70d90dd100 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/aggregator/ValueAggregator.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.segment.processing.aggregator;
 
+import java.util.Map;
+
+
 /**
  * Interface for value aggregator
  */
@@ -27,5 +30,5 @@ public interface ValueAggregator {
    * Given two values, return the aggregated value
    * @return aggregated value given two column values
    */
-  Object aggregate(Object value1, Object value2);
+  Object aggregate(Object value1, Object value2, Map<String, String> 
functionParameters);
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
index 053f78b6f3..56009608ee 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorConfig.java
@@ -44,12 +44,14 @@ public class SegmentProcessorConfig {
   private final List<PartitionerConfig> _partitionerConfigs;
   private final MergeType _mergeType;
   private final Map<String, AggregationFunctionType> _aggregationTypes;
+  private final Map<String, Map<String, String>> 
_aggregationFunctionParameters;
   private final SegmentConfig _segmentConfig;
   private final Consumer<Object> _progressObserver;
 
   private SegmentProcessorConfig(TableConfig tableConfig, Schema schema, 
TimeHandlerConfig timeHandlerConfig,
       List<PartitionerConfig> partitionerConfigs, MergeType mergeType,
-      Map<String, AggregationFunctionType> aggregationTypes, SegmentConfig 
segmentConfig,
+      Map<String, AggregationFunctionType> aggregationTypes,
+      Map<String, Map<String, String>> aggregationFunctionParameters, 
SegmentConfig segmentConfig,
       Consumer<Object> progressObserver) {
     TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
     _tableConfig = tableConfig;
@@ -58,6 +60,7 @@ public class SegmentProcessorConfig {
     _partitionerConfigs = partitionerConfigs;
     _mergeType = mergeType;
     _aggregationTypes = aggregationTypes;
+    _aggregationFunctionParameters = aggregationFunctionParameters;
     _segmentConfig = segmentConfig;
     _progressObserver = (progressObserver != null) ? progressObserver : p -> {
       // Do nothing.
@@ -106,6 +109,13 @@ public class SegmentProcessorConfig {
     return _aggregationTypes;
   }
 
+  /**
+   * The aggregation function parameters for the SegmentProcessorFramework's 
reduce phase with ROLLUP merge type
+   */
+  public Map<String, Map<String, String>> getAggregationFunctionParameters() {
+    return _aggregationFunctionParameters;
+  }
+
   /**
    * The SegmentConfig for the SegmentProcessorFramework's reduce phase
    */
@@ -134,6 +144,7 @@ public class SegmentProcessorConfig {
     private List<PartitionerConfig> _partitionerConfigs;
     private MergeType _mergeType;
     private Map<String, AggregationFunctionType> _aggregationTypes;
+    private Map<String, Map<String, String>> _aggregationFunctionParameters;
     private SegmentConfig _segmentConfig;
     private Consumer<Object> _progressObserver;
 
@@ -167,6 +178,11 @@ public class SegmentProcessorConfig {
       return this;
     }
 
+    public Builder setAggregationFunctionParameters(Map<String, Map<String, 
String>> aggregationFunctionParameters) {
+      _aggregationFunctionParameters = aggregationFunctionParameters;
+      return this;
+    }
+
     public Builder setSegmentConfig(SegmentConfig segmentConfig) {
       _segmentConfig = segmentConfig;
       return this;
@@ -193,11 +209,14 @@ public class SegmentProcessorConfig {
       if (_aggregationTypes == null) {
         _aggregationTypes = Collections.emptyMap();
       }
+      if (_aggregationFunctionParameters == null) {
+        _aggregationFunctionParameters = Collections.emptyMap();
+      }
       if (_segmentConfig == null) {
         _segmentConfig = new SegmentConfig.Builder().build();
       }
       return new SegmentProcessorConfig(_tableConfig, _schema, 
_timeHandlerConfig, _partitionerConfigs, _mergeType,
-          _aggregationTypes, _segmentConfig, _progressObserver);
+          _aggregationTypes, _aggregationFunctionParameters, _segmentConfig, 
_progressObserver);
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
index a205500e34..59fca478fa 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/ReducerFactory.java
@@ -38,7 +38,8 @@ public class ReducerFactory {
       case CONCAT:
         return new ConcatReducer(fileManager);
       case ROLLUP:
-        return new RollupReducer(partitionId, fileManager, 
processorConfig.getAggregationTypes(), reducerOutputDir);
+        return new RollupReducer(partitionId, fileManager, 
processorConfig.getAggregationTypes(),
+            processorConfig.getAggregationFunctionParameters(), 
reducerOutputDir);
       case DEDUP:
         return new DedupReducer(partitionId, fileManager, reducerOutputDir);
       default:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
index ae88120f20..fdd1a67173 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.segment.processing.reducer;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
@@ -47,14 +48,17 @@ public class RollupReducer implements Reducer {
   private final String _partitionId;
   private final GenericRowFileManager _fileManager;
   private final Map<String, AggregationFunctionType> _aggregationTypes;
+  private final Map<String, Map<String, String>> 
_aggregationFunctionParameters;
   private final File _reducerOutputDir;
   private GenericRowFileManager _rollupFileManager;
 
   public RollupReducer(String partitionId, GenericRowFileManager fileManager,
-      Map<String, AggregationFunctionType> aggregationTypes, File 
reducerOutputDir) {
+      Map<String, AggregationFunctionType> aggregationTypes,
+      Map<String, Map<String, String>> aggregationFunctionParameters, File 
reducerOutputDir) {
     _partitionId = partitionId;
     _fileManager = fileManager;
     _aggregationTypes = aggregationTypes;
+    _aggregationFunctionParameters = aggregationFunctionParameters;
     _reducerOutputDir = reducerOutputDir;
   }
 
@@ -91,7 +95,8 @@ public class RollupReducer implements Reducer {
     for (FieldSpec fieldSpec : fieldSpecs) {
       if (fieldSpec.getFieldType() == FieldType.METRIC) {
         aggregatorContextList.add(new AggregatorContext(fieldSpec,
-            _aggregationTypes.getOrDefault(fieldSpec.getName(), 
DEFAULT_AGGREGATOR_TYPE)));
+            _aggregationTypes.getOrDefault(fieldSpec.getName(), 
DEFAULT_AGGREGATOR_TYPE),
+            _aggregationFunctionParameters.getOrDefault(fieldSpec.getName(), 
Collections.emptyMap())));
       }
     }
 
@@ -159,7 +164,8 @@ public class RollupReducer implements Reducer {
       } else {
         // Non-null field, aggregate the value
         aggregatedRow.putValue(column,
-            
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), 
rowToAggregate.getValue(column)));
+            
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), 
rowToAggregate.getValue(column),
+                aggregatorContext._functionParameters));
       }
     }
   }
@@ -169,17 +175,21 @@ public class RollupReducer implements Reducer {
     for (AggregatorContext aggregatorContext : aggregatorContextList) {
       String column = aggregatorContext._column;
       aggregatedRow.putValue(column,
-          
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), 
rowToAggregate.getValue(column)));
+          
aggregatorContext._aggregator.aggregate(aggregatedRow.getValue(column), 
rowToAggregate.getValue(column),
+              aggregatorContext._functionParameters));
     }
   }
 
   private static class AggregatorContext {
     final String _column;
     final ValueAggregator _aggregator;
+    final Map<String, String> _functionParameters;
 
-    AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType 
aggregationType) {
+    AggregatorContext(FieldSpec fieldSpec, AggregationFunctionType 
aggregationType,
+        Map<String, String> functionParameters) {
       _column = fieldSpec.getName();
       _aggregator = ValueAggregatorFactory.getValueAggregator(aggregationType, 
fieldSpec.getDataType());
+      _functionParameters = functionParameters;
     }
   }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
index 34bf2e5ecf..43f951629b 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtils.java
@@ -137,6 +137,28 @@ public class MergeTaskUtils {
     return aggregationTypes;
   }
 
+  /**
+   * Returns a map from column name to the aggregation function parameters 
associated with it based on the task config.
+   */
+  public static Map<String, Map<String, String>> 
getAggregationFunctionParameters(Map<String, String> taskConfig) {
+    Map<String, Map<String, String>> aggregationFunctionParameters = new 
HashMap<>();
+    String prefix = MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX;
+
+    for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (key.startsWith(prefix)) {
+        String[] parts = key.substring(prefix.length()).split("\\.", 2);
+        if (parts.length == 2) {
+          String metricColumn = parts[0];
+          String paramName = parts[1];
+          aggregationFunctionParameters.computeIfAbsent(metricColumn, k -> new 
HashMap<>()).put(paramName, value);
+        }
+      }
+    }
+    return aggregationFunctionParameters;
+  }
+
   /**
    * Returns the segment config based on the task config.
    */
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
index bebedb3ff2..455849f648 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java
@@ -91,6 +91,10 @@ public class MergeRollupTaskExecutor extends 
BaseMultipleSegmentsConversionExecu
     // Aggregation types
     
segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
 
+    // Aggregation function parameters
+    segmentProcessorConfigBuilder.setAggregationFunctionParameters(
+        MergeTaskUtils.getAggregationFunctionParameters(configs));
+
     // Segment config
     
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
index c3c7720e1b..b040b54d53 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtils.java
@@ -23,6 +23,8 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.MergeTask;
@@ -51,12 +53,25 @@ public class MergeRollupTaskUtils {
    */
   public static Map<String, Map<String, String>> 
getLevelToConfigMap(Map<String, String> taskConfig) {
     Map<String, Map<String, String>> levelToConfigMap = new TreeMap<>();
+
+    // Regex to match aggregation function parameter keys
+    Pattern pattern = 
Pattern.compile("(\\w+)\\.aggregationFunctionParameters\\.(\\w+)\\.(\\w+)");
+
     for (Map.Entry<String, String> entry : taskConfig.entrySet()) {
       String key = entry.getKey();
       for (String configKey : VALID_CONFIG_KEYS) {
         if (key.endsWith(configKey)) {
           String level = key.substring(0, key.length() - configKey.length() - 
1);
           levelToConfigMap.computeIfAbsent(level, k -> new 
TreeMap<>()).put(configKey, entry.getValue());
+        } else {
+          Matcher matcher = pattern.matcher(key);
+          if (matcher.matches()) {
+            String level = matcher.group(1).trim();  // e.g., "1day" or "1hour"
+            String metric = matcher.group(2).trim(); // e.g., "metricColumnA" 
or "metricColumnB"
+            String param = matcher.group(3).trim();  // e.g., "nominalEntries" 
or "p"
+            String metricParam = 
MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + metric + "." + param;
+            levelToConfigMap.computeIfAbsent(level, k -> new 
TreeMap<>()).put(metricParam, entry.getValue());
+          }
         }
       }
     }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
index 502fa1cc76..bb1fc70afa 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -149,6 +149,10 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends 
BaseMultipleSegmentsC
     // Aggregation types
     
segmentProcessorConfigBuilder.setAggregationTypes(MergeTaskUtils.getAggregationTypes(configs));
 
+    // Aggregation function parameters
+    segmentProcessorConfigBuilder.setAggregationFunctionParameters(
+        MergeTaskUtils.getAggregationFunctionParameters(configs));
+
     // Segment config
     
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
 
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
index 731607784b..c60b86899e 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/MergeTaskUtilsTest.java
@@ -49,8 +49,9 @@ public class MergeTaskUtilsTest {
   public void testGetTimeHandlerConfig() {
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("dateTime").build();
-    Schema schema = new Schema.SchemaBuilder()
-        .addDateTime("dateTime", DataType.LONG, 
"1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss", "1:SECONDS").build();
+    Schema schema =
+        new Schema.SchemaBuilder().addDateTime("dateTime", DataType.LONG, 
"1:SECONDS:SIMPLE_DATE_FORMAT:yyyyMMddHHmmss",
+            "1:SECONDS").build();
     Map<String, String> taskConfig = new HashMap<>();
     long expectedWindowStartMs = 1625097600000L;
     long expectedWindowEndMs = 1625184000000L;
@@ -171,6 +172,23 @@ public class MergeTaskUtilsTest {
     }
   }
 
+  @Test
+  public void testGetAggregationFunctionParameters() {
+    Map<String, String> taskConfig = new HashMap<>();
+    taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + 
"metricColumnA.param1", "value1");
+    taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + 
"metricColumnA.param2", "value2");
+    taskConfig.put(MergeTask.AGGREGATION_FUNCTION_PARAMETERS_PREFIX + 
"metricColumnB.param1", "value3");
+    taskConfig.put("otherPrefix.metricColumnC.param1", "value1");
+    taskConfig.put("aggregationFunction.metricColumnD.param2", "value2");
+    Map<String, Map<String, String>> result = 
MergeTaskUtils.getAggregationFunctionParameters(taskConfig);
+    assertEquals(result.size(), 2);
+    assertTrue(result.containsKey("metricColumnA"));
+    assertTrue(result.containsKey("metricColumnB"));
+    assertEquals(result.get("metricColumnA").get("param1"), "value1");
+    assertEquals(result.get("metricColumnA").get("param2"), "value2");
+    assertEquals(result.get("metricColumnB").get("param1"), "value3");
+  }
+
   @Test
   public void testGetSegmentConfig() {
     Map<String, String> taskConfig = new HashMap<>();
@@ -206,12 +224,12 @@ public class MergeTaskUtilsTest {
     segmentZKMetadata.setCustomMap(Collections.emptyMap());
     assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata));
 
-    segmentZKMetadata
-        
.setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY,
 "false"));
+    segmentZKMetadata.setCustomMap(
+        
Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, 
"false"));
     assertTrue(MergeTaskUtils.allowMerge(segmentZKMetadata));
 
-    segmentZKMetadata
-        
.setCustomMap(Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY,
 "true"));
+    segmentZKMetadata.setCustomMap(
+        
Collections.singletonMap(MergeTask.SEGMENT_ZK_METADATA_SHOULD_NOT_MERGE_KEY, 
"true"));
     assertFalse(MergeTaskUtils.allowMerge(segmentZKMetadata));
   }
 }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
index f73559728f..90827e2fa9 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskUtilsTest.java
@@ -98,4 +98,28 @@ public class MergeRollupTaskUtilsTest {
     assertTrue(result.contains("dimension2"), "Expected set to contain 
'dimension2'");
     assertTrue(result.contains("dimension3"), "Expected set to contain 
'dimension3'");
   }
+
+  @Test
+  public void testAggregationFunctionParameters() {
+    Map<String, String> taskConfig = new HashMap<>();
+    
taskConfig.put("hourly.aggregationFunctionParameters.metricColumnA.nominalEntries",
 "16384");
+    
taskConfig.put("hourly.aggregationFunctionParameters.metricColumnB.nominalEntries",
 "8192");
+    
taskConfig.put("daily.aggregationFunctionParameters.metricColumnA.nominalEntries",
 "8192");
+    
taskConfig.put("daily.aggregationFunctionParameters.metricColumnB.nominalEntries",
 "4096");
+
+    Map<String, Map<String, String>> levelToConfigMap = 
MergeRollupTaskUtils.getLevelToConfigMap(taskConfig);
+    assertEquals(levelToConfigMap.size(), 2);
+
+    Map<String, String> hourlyConfig = levelToConfigMap.get("hourly");
+    assertNotNull(hourlyConfig);
+    assertEquals(hourlyConfig.size(), 2);
+    
assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"),
 "16384");
+    
assertEquals(hourlyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"),
 "8192");
+
+    Map<String, String> dailyConfig = levelToConfigMap.get("daily");
+    assertNotNull(dailyConfig);
+    assertEquals(dailyConfig.size(), 2);
+    
assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnA.nominalEntries"),
 "8192");
+    
assertEquals(dailyConfig.get("aggregationFunctionParameters.metricColumnB.nominalEntries"),
 "4096");
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to