snleee commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r715791111



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +349,74 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelection {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments = new ArrayList<>();

Review comment:
       The size of this list will not go beyond `numParallelBuckets`. We can 
slightly optimize the list by feeding the size.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -77,20 +77,23 @@
  *        A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
  *        closest bucket start time.
  *      - The execution window for the task is calculated as,
- *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+ *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
  *      - Skip scheduling if the window is invalid:
  *        - If the execution window is not older than bufferTimeMs, no task 
will be generated
  *        - The windowEndMs of higher merge level should be less or equal to 
the waterMarkMs of lower level
  *      - Bump up target window and watermark if needed.
  *        - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
- *        current window,
- *          keep bumping up the watermark and target window until unmerged 
segments are found. Else skip the scheduling.
- *    - Select all segments for the target window
- *    - Create tasks (per partition for partitioned table) based on 
maxNumRecordsPerTask
+ *          current window, keep bumping up the watermark and target window 
until unmerged segments are found.
+ *          Else skip the scheduling.
+ *    - Select segments for each bucket in the target window:
+ *      - Skip buckets which all segments are merged
+ *      - Pick buckets till the first bucket that has spilled over data
+ *    - Create tasks per bucket (and per partition for partitioned table) 
based on maxNumRecordsPerTask

Review comment:
       Let's add a section on how we decided to handle spilled over segements.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
##########
@@ -270,9 +270,81 @@ public void testMaxNumRecordsPerTask() {
         .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, "daily", "concat",
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
DAILY, "concat", "1d", null, "1000000");
+  }
+
+  /**
+   * Test num parallel buckets
+   */
+  @Test
+  public void testNumParallelBuckets() {

Review comment:
       Add one test to cover the new logic to schedule when # of scheduled 
buckets < numParallelBuckets.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to