snleee commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r715356933



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -193,18 +196,20 @@ public String getTaskType() {
           continue;
         }
 
-        // Get the bucket size and buffer
+        // Get the bucket size, buffer and number of parallel buckets (1 as 
default)
         long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
         long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY));
+        int numParallelBuckets = 
mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null
+            ? 
Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : 
DEFAULT_NUM_PARALLEL_BUCKETS;
 
         // Get watermark from MergeRollupTaskMetadata ZNode
-        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
         long waterMarkMs =
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
         long windowStartMs = waterMarkMs;
-        long windowEndMs = windowStartMs + bucketMs;
+        long windowEndMs = windowStartMs + bucketMs * numParallelBuckets;

Review comment:
       I think that always putting `windowEndMs = windowStartMs + bucketMs * 
numParallelBuckets` can make the validation below `isValidMergeWindowEndTime()` 
to fail even for the cases that we have some segments that we can schedule. 
Please think of the following example:
   
   current time = 9/5
   buffer = 1d
   numParallelBucket = 3
   currentWatermark = 9/2
   
   We start from 9/2. In this round, we should schedule `[9/2, 9/3), [9/3, 
9/4)`.
   
   However, by the new logic, `startWindow = 9/2, endWindow = 9/5` so this is 
no longer valid and we may not schedule any segments.
   
   Can you go through this scenario? The expected behavior is that we would 
want to honor `numParallelBuckets` at `best effort`. (Even if the parallelism = 
3, we should still be able to schedule 1 or 2 buckets depending on the 
availability of windows.)
   
   I think that this approach may make `"real" bufferTime = bufferTime + 
numParallelBuckets - 1` (think about the most base case where we finished to 
catch up and we merge segments for 1 day bucket daily)

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +349,74 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelection {

Review comment:
       Do we have the better name? `SelectedSegments` or 
`SegmentsSelectionResult`?

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -77,20 +77,23 @@
  *        A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
  *        closest bucket start time.
  *      - The execution window for the task is calculated as,
- *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+ *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs * numParallelBuckets
  *      - Skip scheduling if the window is invalid:
  *        - If the execution window is not older than bufferTimeMs, no task 
will be generated
  *        - The windowEndMs of higher merge level should be less or equal to 
the waterMarkMs of lower level
  *      - Bump up target window and watermark if needed.
  *        - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
- *        current window,
- *          keep bumping up the watermark and target window until unmerged 
segments are found. Else skip the scheduling.
- *    - Select all segments for the target window
- *    - Create tasks (per partition for partitioned table) based on 
maxNumRecordsPerTask
+ *          current window, keep bumping up the watermark and target window 
until unmerged segments are found.
+ *          Else skip the scheduling.
+ *    - Select segments for each bucket in the target window:
+ *      - Skip buckets which all segments are merged
+ *      - Pick buckets till the first bucket that has spilled over data

Review comment:
       This line is not clear to me. Are you trying to explain that the spilled 
over segments (segments spanning multiple time windows) will only be picked 
once by the time window who hits this segment for the first time? (handling 
spilled over segments when parallel bucket scheduling is enabled)

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +349,74 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelection {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments = new ArrayList<>();
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelection(long bucketMs, String mergeLevel) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the selection result is sealed, otherwise return true

Review comment:
       I think that it's better to keep the regular contract for add (return 
true if successfully added, return false otherwise).  
   For indicating `sealed result`, we can set the `_isResultSealed = true` and 
add the `isResultSealed()` function. 

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +349,74 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelection {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments = new ArrayList<>();
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelection(long bucketMs, String mergeLevel) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the selection result is sealed, otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = 
_selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()

Review comment:
       Add the comment about the logic
   ```
   If the input segment belongs to the latest bucket...
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to