This is an automated email from the ASF dual-hosted git repository.

pratik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ba014509414 Adding a new property minNumSegmentsPerTask for 
UpsertCompactMerge task (#17104)
ba014509414 is described below

commit ba014509414823d42651ac297a1de3a0e77bbb8d
Author: tarun11Mavani <[email protected]>
AuthorDate: Tue Nov 11 01:05:43 2025 +0530

    Adding a new property minNumSegmentsPerTask for UpsertCompactMerge task 
(#17104)
---
 .../apache/pinot/core/common/MinionConstants.java  | 12 ++++++
 .../UpsertCompactMergeTaskGenerator.java           | 39 ++++++++++--------
 .../segment/local/utils/TableConfigUtils.java      | 37 ++++++++++++++++-
 .../segment/local/utils/TableConfigUtilsTest.java  | 47 ++++++++++++++++++++++
 4 files changed, 117 insertions(+), 18 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java 
b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
index 293f4ed5784..1f784fb368b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java
@@ -316,6 +316,18 @@ public class MinionConstants {
      */
     public static final long DEFAULT_MAX_NUM_SEGMENTS_PER_TASK = 10;
 
+    /**
+     * minimum number of segments to process in a single task
+     */
+    public static final String MIN_NUM_SEGMENTS_PER_TASK_KEY = 
"minNumSegmentsPerTask";
+
+    /**
+     * default minimum number of segments to process in a single task.
+     * Keeping this default to 2 means that we won't run this task if there is 
only one segment which can be merged.
+     * If this is set to 1, this task can act as UpsertCompact task as well.
+     */
+    public static final long DEFAULT_MIN_NUM_SEGMENTS_PER_TASK = 2;
+
     public static final String MERGED_SEGMENTS_ZK_SUFFIX = ".mergedSegments";
 
     public static final String MAX_ZK_CREATION_TIME_MILLIS_KEY = 
"maxZKCreationTimeMillis";
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index 13119e6f08f..5a5705bcf05 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -201,6 +201,9 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
       // Get max number of subtasks for this table
       int maxTasks = getAndUpdateMaxNumSubTasks(taskConfigs,
           MinionConstants.DEFAULT_TABLE_MAX_NUM_TASKS, tableNameWithType);
+      long minNumSegments = Long.parseLong(
+          
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MIN_NUM_SEGMENTS_PER_TASK_KEY,
+              
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MIN_NUM_SEGMENTS_PER_TASK)));
       for (Map.Entry<Integer, List<List<SegmentMergerMetadata>>> entry
           : 
segmentSelectionResult.getSegmentsForCompactMergeByPartition().entrySet()) {
         if (numTasks == maxTasks) {
@@ -211,9 +214,9 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
         if (groups.isEmpty()) {
           continue;
         }
-        // there are no groups with more than 1 segment to merge
-        // TODO this can be later removed if we want to just do single-segment 
compaction from this task
-        if (groups.get(0).size() <= 1) {
+        //there are no groups with more than minNumSegmentsPerTask segment to 
merge. Groups are already sorted, so we
+        // just check the first entry.
+        if (groups.get(0).size() < minNumSegments) {
           continue;
         }
         // TODO see if multiple groups of same partition can be added
@@ -254,6 +257,9 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
     long maxNumSegments = Long.parseLong(
         
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MAX_NUM_SEGMENTS_PER_TASK_KEY,
             
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MAX_NUM_SEGMENTS_PER_TASK)));
+    long minNumSegments = Long.parseLong(
+        
taskConfigs.getOrDefault(MinionConstants.UpsertCompactMergeTask.MIN_NUM_SEGMENTS_PER_TASK_KEY,
+            
String.valueOf(MinionConstants.UpsertCompactMergeTask.DEFAULT_MIN_NUM_SEGMENTS_PER_TASK)));
 
     // default to Long.MAX_VALUE to avoid size-based compaction by default
     long outputSegmentMaxSizeInBytes = Long.MAX_VALUE;
@@ -381,20 +387,21 @@ public class UpsertCompactMergeTaskGenerator extends 
BaseTaskGenerator {
 
       // Sort groups by total invalidDocs in descending order, if invalidDocs 
count are same, prefer group with
       // higher number of small segments in them
-      // remove the groups having only 1 segments in them
-      // TODO this check can be later removed if we want single-segment 
compaction from this task itself
+      // remove the groups having less than minNumSegments segments in them
       List<List<SegmentMergerMetadata>> compactMergeGroups =
-          groups.stream().filter(x -> x.size() > 1).sorted((group1, group2) -> 
{
-            long invalidDocsSum1 = 
group1.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
-            long invalidDocsSum2 = 
group2.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
-            if (invalidDocsSum2 < invalidDocsSum1) {
-              return -1;
-            } else if (invalidDocsSum2 == invalidDocsSum1) {
-              return Long.compare(group2.size(), group1.size());
-            } else {
-              return 1;
-            }
-          }).collect(Collectors.toList());
+          groups.stream()
+              .filter(x -> x.size() >= minNumSegments)
+              .sorted((group1, group2) -> {
+                long invalidDocsSum1 = 
group1.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+                long invalidDocsSum2 = 
group2.stream().mapToLong(SegmentMergerMetadata::getInvalidDocIds).sum();
+                if (invalidDocsSum2 < invalidDocsSum1) {
+                  return -1;
+                } else if (invalidDocsSum2 == invalidDocsSum1) {
+                  return Long.compare(group2.size(), group1.size());
+                } else {
+                  return 1;
+                }
+              }).collect(Collectors.toList());
 
       if (!compactMergeGroups.isEmpty()) {
         groupedSegments.put(partitionID, compactMergeGroups);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 16c89b24c9b..9ddbe74eed2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -190,6 +190,8 @@ public final class TableConfigUtils {
       validatePartialUpsertStrategies(tableConfig, schema);
     }
 
+    validateTaskConfig(tableConfig);
+
     if (_enforcePoolBasedAssignment) {
       validateInstancePoolsNReplicaGroups(tableConfig);
     }
@@ -1136,6 +1138,37 @@ public final class TableConfigUtils {
     }
   }
 
+  /**
+   * Validates task configuration to ensure no conflicting task types are 
configured.
+   */
+  @VisibleForTesting
+  static void validateTaskConfig(TableConfig tableConfig) {
+    TableTaskConfig taskConfig = tableConfig.getTaskConfig();
+    if (taskConfig == null || taskConfig.getTaskTypeConfigsMap() == null) {
+      return;
+    }
+
+    Map<String, Map<String, String>> taskTypeConfigsMap = 
taskConfig.getTaskTypeConfigsMap();
+
+    String minNumSegmentsPerTaskKey = "minNumSegmentsPerTask";
+    if (taskTypeConfigsMap.containsKey(UPSERT_COMPACT_MERGE_TASK_TYPE)
+        && taskTypeConfigsMap.containsKey(UPSERT_COMPACTION_TASK_TYPE)) {
+
+      Map<String, String> upsertCompactMergeConfig = 
taskTypeConfigsMap.get(UPSERT_COMPACT_MERGE_TASK_TYPE);
+
+      if (upsertCompactMergeConfig != null) {
+        long minNumSegments = Long.parseLong(
+            upsertCompactMergeConfig.getOrDefault(minNumSegmentsPerTaskKey, 
String.valueOf(2)));
+
+        Preconditions.checkState(minNumSegments > 1, String.format(
+            "When %s.%s is set to 1, %s should not be configured to avoid 
indeterministic behavior. "
+                + "Please remove %s configuration or set %s to a value greater 
than 1.",
+            UPSERT_COMPACT_MERGE_TASK_TYPE, minNumSegmentsPerTaskKey, 
UPSERT_COMPACTION_TASK_TYPE,
+            UPSERT_COMPACTION_TASK_TYPE, minNumSegmentsPerTaskKey));
+      }
+    }
+  }
+
   /**
    * Validates the tier configs
    * Checks for the right segmentSelectorType and its required properties
@@ -1372,7 +1405,7 @@ public final class TableConfigUtils {
           }
           String column = columnPair.getColumn();
           if (!column.equals(AggregationFunctionColumnPair.STAR)) {
-              aggregatedColumns.add(column);
+            aggregatedColumns.add(column);
           } else if (columnPair.getFunctionType() != 
AggregationFunctionType.COUNT) {
             throw new IllegalStateException("Non-COUNT function set the column 
as '*' in the functionColumnPair: "
                 + functionColumnPair + ". Please configure an actual column 
for the function");
@@ -1401,7 +1434,7 @@ public final class TableConfigUtils {
           }
           String column = columnPair.getColumn();
           if (!column.equals(AggregationFunctionColumnPair.STAR)) {
-              aggregatedColumns.add(column);
+            aggregatedColumns.add(column);
           } else if (columnPair.getFunctionType() != 
AggregationFunctionType.COUNT) {
             throw new IllegalStateException("Non-COUNT function set the column 
as '*' in the aggregationConfig for "
                 + "function: " + aggregationConfig.getAggregationFunction()
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 30ea8126ace..5bf25d9fd95 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -3562,4 +3562,51 @@ public class TableConfigUtilsTest {
       assertEquals(e.getMessage(), "MetadataTTL: 50000(ms) must be smaller 
than the minimum segmentAge: 50000(ms)");
     }
   }
+
+  @Test
+  public void testValidateTaskConfigConflict() {
+    // Test that minNumSegmentsPerTask=1 with both UpsertCompactMergeTask and 
UpsertCompactionTask configured fails
+    Map<String, String> upsertCompactMergeConfig = new HashMap<>();
+    upsertCompactMergeConfig.put("bufferTimePeriod", "5d");
+    upsertCompactMergeConfig.put("minNumSegmentsPerTask", "1");
+
+    Map<String, String> upsertCompactionConfig = new HashMap<>();
+    upsertCompactionConfig.put("bufferTimePeriod", "5d");
+
+    Map<String, Map<String, String>> taskTypeConfigsMap = new HashMap<>();
+    taskTypeConfigsMap.put("UpsertCompactMergeTask", upsertCompactMergeConfig);
+    taskTypeConfigsMap.put("UpsertCompactionTask", upsertCompactionConfig);
+
+    TableConfig tableConfigWithConflict = new 
TableConfigBuilder(TableType.REALTIME)
+        .setTableName(TABLE_NAME)
+        .setTaskConfig(new 
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+        .build();
+
+    assertThrows(IllegalStateException.class, () -> 
TableConfigUtils.validateTaskConfig(tableConfigWithConflict));
+
+    // Test that minNumSegmentsPerTask=2 with both tasks configured passes
+    upsertCompactMergeConfig.put("minNumSegmentsPerTask", "2");
+    taskTypeConfigsMap.put("UpsertCompactMergeTask", upsertCompactMergeConfig);
+
+    TableConfig tableConfigValid = new TableConfigBuilder(TableType.REALTIME)
+        .setTableName(TABLE_NAME)
+        .setTaskConfig(new 
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+        .build();
+
+    // Should not throw
+    TableConfigUtils.validateTaskConfig(tableConfigValid);
+
+    // Test that only UpsertCompactMergeTask with minNumSegmentsPerTask=1 
(without UpsertCompactionTask) passes
+    taskTypeConfigsMap.remove("UpsertCompactionTask");
+    upsertCompactMergeConfig.put("minNumSegmentsPerTask", "1");
+    taskTypeConfigsMap.put("UpsertCompactMergeTask", upsertCompactMergeConfig);
+
+    TableConfig tableConfigOnlyCompactMerge = new 
TableConfigBuilder(TableType.REALTIME)
+        .setTableName(TABLE_NAME)
+        .setTaskConfig(new 
org.apache.pinot.spi.config.table.TableTaskConfig(taskTypeConfigsMap))
+        .build();
+
+    // Should not throw
+    TableConfigUtils.validateTaskConfig(tableConfigOnlyCompactMerge);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to