jtao15 commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r726452828



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -212,106 +221,129 @@ public String getTaskType() {
           continue;
         }
 
-        // Get the bucket size and buffer
+        // Get the bucket size, buffer and number of parallel buckets (1 as 
default)
         long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
         if (bucketMs <= 0) {
           LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must 
be larger than 0", offlineTableName,
               mergeLevel);
           continue;
         }
         long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY));
+        int numParallelBuckets = 
mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null
+            ? 
Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : 
DEFAULT_NUM_PARALLEL_BUCKETS;
 
         // Get watermark from MergeRollupTaskMetadata ZNode
-        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+        // bucketStartMs = watermarkMs
+        // bucketEndMs = bucketStartMs + bucketTimeMs
         long waterMarkMs =
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
-        long windowStartMs = waterMarkMs;
-        long windowEndMs = windowStartMs + bucketMs;
-
-        if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, lowerMergeLevel, 
mergeRollupTaskMetadata)) {
-          LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is 
not a valid merge window, Skipping task "
-              + "generation: {}", windowStartMs, windowEndMs, mergeLevel, 
taskType);
+        long bucketStartMs = waterMarkMs;
+        long bucketEndMs = getBucketEndTime(bucketStartMs, bucketMs, bufferMs, 
lowerMergeLevel,
+            mergeRollupTaskMetadata);
+
+        if (!isValidBucket(bucketStartMs, bucketEndMs)) {
+          LOGGER.info(
+              "Bucket with start: {} and end: {} of mergeLevel: {} is not a 
valid merge window, Skipping task "
+                  + "generation: {}",
+              bucketStartMs, bucketEndMs, mergeLevel, taskType);
           continue;
         }
 
-        // Find all segments overlapping with the merge window, if all 
overlapping segments are merged, bump up the
-        // target window
-        List<SegmentZKMetadata> selectedSegments = new ArrayList<>();
+        //

Review comment:
       Updated the comment.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to