jtao15 commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r725394391



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int 
numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the 
selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = 
_selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 
1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new 
bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled 
over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;
+      }
+      if (!isMergedSegment(segment, _mergeLevel)) {
+        if (!_hasUnmergedSegments) {
+          _firstUnmergedBucket = _selectedSegments.size() - 1;
+        }
+        _hasUnmergedSegments = true;
+      }
+      return true;
+    }
+
+    boolean hasUnmergedSegments() {

Review comment:
       Updated to use `hasUnmergedSegments` to track each bucket. So we will 
not schedule the merged buckets again if some buckets got stuck.
   For example:
   8/10, 8/11, 8/12, 8/13, 8/14 and parallelism = 3 
   First round we schedule 8/10, 8/11, 8/12. 8/10 is stuck over 24 hours, and 
8/11, 8/12 are merged successfully.
   Second round the scheduler will pick 8/10, 8/13 and 8/14 and skip 8/11 and 
8/12.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to