jtao15 commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r717064739



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +349,74 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelection {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments = new ArrayList<>();
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelection(long bucketMs, String mergeLevel) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the selection result is sealed, otherwise return true

Review comment:
       Updated the comment that return `false` if the segment cannot be added, 
and this indicates that the selection result is sealed.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -77,20 +77,23 @@
  *        A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
  *        closest bucket start time.
  *      - The execution window for the task is calculated as,
- *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+ *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
  *      - Skip scheduling if the window is invalid:
  *        - If the execution window is not older than bufferTimeMs, no task 
will be generated
  *        - The windowEndMs of higher merge level should be less or equal to 
the waterMarkMs of lower level
  *      - Bump up target window and watermark if needed.
  *        - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
- *        current window,
- *          keep bumping up the watermark and target window until unmerged 
segments are found. Else skip the scheduling.
- *    - Select all segments for the target window
- *    - Create tasks (per partition for partitioned table) based on 
maxNumRecordsPerTask
+ *          current window, keep bumping up the watermark and target window 
until unmerged segments are found.
+ *          Else skip the scheduling.
+ *    - Select segments for each bucket in the target window:
+ *      - Skip buckets which all segments are merged
+ *      - Pick buckets till the first bucket that has spilled over data

Review comment:
       I'm trying to explain that the picking process will stop if we have one 
bucket with spilled over data. Updated the comment, hope it's clear now.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -77,20 +77,23 @@
  *        A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
  *        closest bucket start time.
  *      - The execution window for the task is calculated as,
- *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+ *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
  *      - Skip scheduling if the window is invalid:
  *        - If the execution window is not older than bufferTimeMs, no task 
will be generated
  *        - The windowEndMs of higher merge level should be less or equal to 
the waterMarkMs of lower level
  *      - Bump up target window and watermark if needed.
  *        - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
- *        current window,
- *          keep bumping up the watermark and target window until unmerged 
segments are found. Else skip the scheduling.
- *    - Select all segments for the target window
- *    - Create tasks (per partition for partitioned table) based on 
maxNumRecordsPerTask
+ *          current window, keep bumping up the watermark and target window 
until unmerged segments are found.
+ *          Else skip the scheduling.
+ *    - Select segments for each bucket in the target window:
+ *      - Skip buckets which all segments are merged
+ *      - Pick buckets till the first bucket that has spilled over data
+ *    - Create tasks per bucket (and per partition for partitioned table) 
based on maxNumRecordsPerTask

Review comment:
       Added comments about the spilled over data handling.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -193,18 +196,20 @@ public String getTaskType() {
           continue;
         }
 
-        // Get the bucket size and buffer
+        // Get the bucket size, buffer and number of parallel buckets (1 as 
default)
         long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
         long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY));
+        int numParallelBuckets = 
mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null
+            ? 
Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : 
DEFAULT_NUM_PARALLEL_BUCKETS;
 
         // Get watermark from MergeRollupTaskMetadata ZNode
-        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
         long waterMarkMs =
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
         long windowStartMs = waterMarkMs;
-        long windowEndMs = windowStartMs + bucketMs;
+        long windowEndMs = windowStartMs + bucketMs * numParallelBuckets;

Review comment:
       Good point, updated to use a helper function to get the `endWindow` 
which honors buffer time and watermarks, so the tasks are scheduled with 
`numParallelBuckets` at best effort.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -193,18 +196,20 @@ public String getTaskType() {
           continue;
         }
 
-        // Get the bucket size and buffer
+        // Get the bucket size, buffer and number of parallel buckets (1 as 
default)
         long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
         long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY));
+        int numParallelBuckets = 
mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null
+            ? 
Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : 
DEFAULT_NUM_PARALLEL_BUCKETS;
 
         // Get watermark from MergeRollupTaskMetadata ZNode
-        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
         long waterMarkMs =
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
         long windowStartMs = waterMarkMs;
-        long windowEndMs = windowStartMs + bucketMs;
+        long windowEndMs = windowStartMs + bucketMs * numParallelBuckets;

Review comment:
       > +1. I feel the algorithm should be loop over bucket one by one, and 
keep tracking if there are spill over segments.
   > 
   > Alternatively, we might also be able to put multiple buckets into the same 
task as long as it follows the `maxNumRecordsPerTask`.
   
   

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -193,18 +196,20 @@ public String getTaskType() {
           continue;
         }
 
-        // Get the bucket size and buffer
+        // Get the bucket size, buffer and number of parallel buckets (1 as 
default)
         long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
         long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY));
+        int numParallelBuckets = 
mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null
+            ? 
Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : 
DEFAULT_NUM_PARALLEL_BUCKETS;
 
         // Get watermark from MergeRollupTaskMetadata ZNode
-        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
         long waterMarkMs =
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
         long windowStartMs = waterMarkMs;
-        long windowEndMs = windowStartMs + bucketMs;
+        long windowEndMs = windowStartMs + bucketMs * numParallelBuckets;

Review comment:
       > +1. I feel the algorithm should be loop over bucket one by one, and 
keep tracking if there are spill over segments.
   > 
   > Alternatively, we might also be able to put multiple buckets into the same 
task as long as it follows the `maxNumRecordsPerTask`.
   
   I'm trying to keep the buckets in the same structure 
`SegmentSelectionResult` so it can fit in current code easily, the structure 
will maintain the stats related to spilled over or merged segments.
   IMO, it would be simpler and cleaner if one task only handle one time bucket 
instead of multiple.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to