This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new bad572ecb6 [MergeRollupTask] include partition info into segment name 
(#9815)
bad572ecb6 is described below

commit bad572ecb636d766862b048be47c9b6bfa8ab4ef
Author: Haitao Zhang <hai...@startree.ai>
AuthorDate: Thu Nov 17 18:11:23 2022 -0800

    [MergeRollupTask] include partition info into segment name (#9815)
---
 .../mergerollup/MergeRollupTaskGenerator.java      | 32 ++++++++++++++++------
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 0983e9f4cd..28043c2f60 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -105,6 +105,7 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
   private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
   private static final int DEFAULT_NUM_PARALLEL_BUCKETS = 1;
   private static final String REFRESH = "REFRESH";
+  private static final String DELIMITER_IN_SEGMENT_NAME = "_";
 
   // This is the metric that keeps track of the task delay in the number of 
time buckets. For example, if we see this
   // number to be 7 and merge task is configured with "bucketTimePeriod = 1d", 
this means that we have 7 days of
@@ -349,7 +350,7 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
           for (List<SegmentZKMetadata> selectedSegmentsPerBucket : 
selectedSegmentsForAllBuckets) {
             pinotTaskConfigsForTable.addAll(
                 createPinotTaskConfigs(selectedSegmentsPerBucket, 
offlineTableName, maxNumRecordsPerTask, mergeLevel,
-                    mergeConfigs, taskConfigs));
+                    null, mergeConfigs, taskConfigs));
           }
         } else {
           // For partitioned table, schedule separate tasks for each 
partitionId (partitionId is constructed from
@@ -383,16 +384,18 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
               }
             }
 
-            for (List<SegmentZKMetadata> partitionedSegments : 
partitionToSegments.values()) {
+            for (Map.Entry<List<Integer>, List<SegmentZKMetadata>> entry : 
partitionToSegments.entrySet()) {
+              List<Integer> partition = entry.getKey();
+              List<SegmentZKMetadata> partitionedSegments = entry.getValue();
               pinotTaskConfigsForTable.addAll(
                   createPinotTaskConfigs(partitionedSegments, 
offlineTableName, maxNumRecordsPerTask, mergeLevel,
-                      mergeConfigs, taskConfigs));
+                      partition, mergeConfigs, taskConfigs));
             }
 
             if (!outlierSegments.isEmpty()) {
               pinotTaskConfigsForTable.addAll(
                   createPinotTaskConfigs(outlierSegments, offlineTableName, 
maxNumRecordsPerTask, mergeLevel,
-                      mergeConfigs, taskConfigs));
+                      null, mergeConfigs, taskConfigs));
             }
           }
         }
@@ -516,8 +519,8 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
    * Create pinot task configs with selected segments and configs
    */
   private List<PinotTaskConfig> createPinotTaskConfigs(List<SegmentZKMetadata> 
selectedSegments,
-      String offlineTableName, int maxNumRecordsPerTask, String mergeLevel, 
Map<String, String> mergeConfigs,
-      Map<String, String> taskConfigs) {
+      String offlineTableName, int maxNumRecordsPerTask, String mergeLevel, 
List<Integer> partition,
+      Map<String, String> mergeConfigs, Map<String, String> taskConfigs) {
     int numRecordsPerTask = 0;
     List<List<String>> segmentNamesList = new ArrayList<>();
     List<List<String>> downloadURLsList = new ArrayList<>();
@@ -539,6 +542,15 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
     }
 
     List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>();
+
+    StringBuilder partitionSuffixBuilder = new StringBuilder();
+    if (partition != null && !partition.isEmpty()) {
+      for (int columnPartition : partition) {
+        
partitionSuffixBuilder.append(DELIMITER_IN_SEGMENT_NAME).append(columnPartition);
+      }
+    }
+    String partitionSuffix = partitionSuffixBuilder.toString();
+
     for (int i = 0; i < segmentNamesList.size(); i++) {
       Map<String, String> configs = new HashMap<>();
       configs.put(MinionConstants.TABLE_NAME_KEY, offlineTableName);
@@ -562,9 +574,13 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
       configs.put(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY,
           mergeConfigs.get(MergeTask.MAX_NUM_RECORDS_PER_SEGMENT_KEY));
 
+      // Segment name conflict happens when the current method 
"createPinotTaskConfigs" is invoked more than once within
+      // the same epoch millisecond, which may happen when there are multiple 
partitions.
+      // To prevent such name conflict, we include a partitionSeqSuffix to the 
segment name.
       configs.put(MergeRollupTask.SEGMENT_NAME_PREFIX_KEY,
-          MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + "_" + 
System.currentTimeMillis() + "_" + i + "_"
-              + TableNameBuilder.extractRawTableName(offlineTableName));
+          MergeRollupTask.MERGED_SEGMENT_NAME_PREFIX + mergeLevel + 
DELIMITER_IN_SEGMENT_NAME
+              + System.currentTimeMillis() + partitionSuffix + 
DELIMITER_IN_SEGMENT_NAME + i
+              + DELIMITER_IN_SEGMENT_NAME + 
TableNameBuilder.extractRawTableName(offlineTableName));
       pinotTaskConfigs.add(new PinotTaskConfig(MergeRollupTask.TASK_TYPE, 
configs));
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to