snleee commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r722467149



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int 
numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());

Review comment:
       Instead of pre-adding the arrayList here, we can check the size of 
`_selectedSegments` at the beginning of the `add()` call.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -371,17 +471,26 @@ private boolean isMergedSegment(SegmentZKMetadata 
segmentZKMetadata, String merg
   }
 
   /**
-   * Check if the merge window end time is valid
+   * Get the merge window end time
    */
-  private boolean isValidMergeWindowEndTime(long windowEndMs, long bufferMs, 
String lowerMergeLevel,
-      MergeRollupTaskMetadata mergeRollupTaskMetadata) {
-    // Check that execution window endTimeMs <= now - bufferTime
-    if (windowEndMs > System.currentTimeMillis() - bufferMs) {
-      return false;
+  private long getMergeWindowEndTime(long windowStartMs, long bucketMs, long 
numParallelBuckets, long bufferMs,

Review comment:
       One edge case to think of:
   
   Let's say we have the following data available:
   
   8/10, 8/11 (no data), 8/12, 8/13 and parallelism = 3
   
   In this case, our current algo will schedule the window for `8/10,8/11,8/12` 
and only 2 tasks will be scheduled. If we want to fully utilize the 
parallelism, optimized algo will schedule `8/10, 8/12, 8/13`. 
   
   I'm fine with not handling but if the fix is relatively simple, optimized 
version would be slightly better.
   
   
   
   

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int 
numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the 
selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = 
_selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 
1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new 
bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled 
over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;
+      }
+      if (!isMergedSegment(segment, _mergeLevel)) {
+        if (!_hasUnmergedSegments) {
+          _firstUnmergedBucket = _selectedSegments.size() - 1;
+        }
+        _hasUnmergedSegments = true;
+      }
+      return true;
+    }
+
+    boolean hasUnmergedSegments() {

Review comment:
       Don't we need to track `hasUnmergedSegments` for each bucket?

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int 
numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the 
selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = 
_selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 
1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new 
bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled 
over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;

Review comment:
       If we directly `return false` here, I think that we can remove 
`_hasSpilledOverData`. At this point, we already added the spilled-over disk to 
the current bucket's selected segment list. Why do we indicate that the result 
is sealed in the next round of `add()`?

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -232,32 +243,31 @@ public String getTaskType() {
             long endTimeMs = preSelectedSegment.getEndTimeMs();
             if (endTimeMs >= windowStartMs) {
               // For segments overlapping with current window, add to the 
result list
-              selectedSegments.add(preSelectedSegment);
-              if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
-                hasUnmergedSegments = true;
+              if (!selectedSegments.add(preSelectedSegment)) {
+                break;
               }
             }
             // endTimeMs < windowStartMs
             // Haven't find the first overlapping segment, continue to the 
next segment
           } else {
             // Has gone through all overlapping segments for current window
-            if (hasUnmergedSegments) {
+            if (selectedSegments.hasUnmergedSegments()) {
               // Found unmerged segments, schedule merge task for current 
window
               break;
             } else {
               // No unmerged segments found, clean up selected segments and 
bump up the merge window
               // TODO: If there are many small merged segments, we should 
merge them again
               selectedSegments.clear();
-              selectedSegments.add(preSelectedSegment);
-              if (!isMergedSegment(preSelectedSegment, mergeLevel)) {
-                hasUnmergedSegments = true;
-              }
               windowStartMs = startTimeMs / bucketMs * bucketMs;
-              windowEndMs = windowStartMs + bucketMs;
-              if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, 
lowerMergeLevel, mergeRollupTaskMetadata)) {
+              windowEndMs = getMergeWindowEndTime(windowStartMs, bucketMs, 
numParallelBuckets, bufferMs,
+                  lowerMergeLevel, mergeRollupTaskMetadata);
+              if (!isValidMergeWindow(windowStartMs, windowEndMs)) {
                 isValidMergeWindow = false;
                 break;
               }
+              if (!selectedSegments.add(preSelectedSegment)) {

Review comment:
       Add the comment on why we are breaking (even if we have the comments on 
`add()`, it will be easier if we have the comment here as well).

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;

Review comment:
       `_selectedSegmentsForBuckets`

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -361,6 +447,20 @@ private boolean validate(TableConfig tableConfig, String 
taskType) {
     return true;
   }
 
+  /**
+   * Check if two segments' start time belong to the same bucket
+   */
+  private boolean startInSameBucket(long startTimeMs1, long startTimeMs2, long 
bucketMs) {

Review comment:
       `startInSameBucket` -> `startAtSameBucket` or `belongToSameBucket`

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int 
numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the 
selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = 
_selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 
1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new 
bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled 
over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;
+      }
+      if (!isMergedSegment(segment, _mergeLevel)) {
+        if (!_hasUnmergedSegments) {
+          _firstUnmergedBucket = _selectedSegments.size() - 1;
+        }
+        _hasUnmergedSegments = true;
+      }
+      return true;
+    }
+
+    boolean hasUnmergedSegments() {
+      return _hasUnmergedSegments;
+    }
+
+    void clear() {
+      _hasSpilledOverData = false;
+      _selectedSegments.clear();
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    List<List<SegmentZKMetadata>> getSelectedSegments() {

Review comment:
       `getSelectedSegmentsForAllBuckets()` may be more clear?

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -342,6 +357,77 @@ public String getTaskType() {
     return pinotTaskConfigs;
   }
 
+  /**
+   * Segment selection result
+   */
+  private class SegmentsSelectionResult {
+    private long _bucketMs;
+    private String _mergeLevel;
+    private boolean _hasSpilledOverData = false;
+    private boolean _hasUnmergedSegments = false;
+    // List of buckets of segments, segments in 
_selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged
+    List<List<SegmentZKMetadata>> _selectedSegments;
+    int _firstUnmergedBucket = 0;
+
+    SegmentsSelectionResult(long bucketMs, String mergeLevel, int 
numParallelBuckets) {
+      _bucketMs = bucketMs;
+      _mergeLevel = mergeLevel;
+      _selectedSegments = new ArrayList<>(numParallelBuckets);
+      _selectedSegments.add(new ArrayList<>());
+    }
+
+    /**
+     * Add segment to the selection result
+     *
+     * @return false if the segment cannot be added and it indicates that the 
selection result is sealed,
+     * otherwise return true
+     */
+    boolean add(SegmentZKMetadata segment) {
+      List<SegmentZKMetadata> selectedSegmentsPerBucket = 
_selectedSegments.get(_selectedSegments.size() - 1);
+      if (selectedSegmentsPerBucket.isEmpty()
+          || startInSameBucket(segment.getStartTimeMs(),
+              selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 
1).getStartTimeMs(), _bucketMs)) {
+        // If the input segment belongs to current bucket, add it to the bucket
+        selectedSegmentsPerBucket.add(segment);
+      } else {
+        // The segment doesn't belong to current buckets, need to create a new 
bucket
+        if (_hasSpilledOverData) {
+          // The selection result is sealed if the last bucket has spilled 
over data
+          return false;
+        } else {
+          selectedSegmentsPerBucket = new ArrayList<>();
+          selectedSegmentsPerBucket.add(segment);
+          _selectedSegments.add(selectedSegmentsPerBucket);
+        }
+      }
+
+      if (hasSpilledOverData(segment, _bucketMs)) {
+        _hasSpilledOverData = true;
+      }
+      if (!isMergedSegment(segment, _mergeLevel)) {
+        if (!_hasUnmergedSegments) {
+          _firstUnmergedBucket = _selectedSegments.size() - 1;
+        }
+        _hasUnmergedSegments = true;
+      }
+      return true;
+    }
+
+    boolean hasUnmergedSegments() {
+      return _hasUnmergedSegments;
+    }
+
+    void clear() {
+      _hasSpilledOverData = false;
+      _selectedSegments.clear();
+      _selectedSegments.add(new ArrayList<>());

Review comment:
       Let's remove this and move this to the `add()` call as mentioned before.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to