jtao15 commented on a change in pull request #7481: URL: https://github.com/apache/pinot/pull/7481#discussion_r725394391
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -342,6 +357,77 @@ public String getTaskType() { return pinotTaskConfigs; } + /** + * Segment selection result + */ + private class SegmentsSelectionResult { + private long _bucketMs; + private String _mergeLevel; + private boolean _hasSpilledOverData = false; + private boolean _hasUnmergedSegments = false; + // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged + List<List<SegmentZKMetadata>> _selectedSegments; + int _firstUnmergedBucket = 0; + + SegmentsSelectionResult(long bucketMs, String mergeLevel, int numParallelBuckets) { + _bucketMs = bucketMs; + _mergeLevel = mergeLevel; + _selectedSegments = new ArrayList<>(numParallelBuckets); + _selectedSegments.add(new ArrayList<>()); + } + + /** + * Add segment to the selection result + * + * @return false if the segment cannot be added and it indicates that the selection result is sealed, + * otherwise return true + */ + boolean add(SegmentZKMetadata segment) { + List<SegmentZKMetadata> selectedSegmentsPerBucket = _selectedSegments.get(_selectedSegments.size() - 1); + if (selectedSegmentsPerBucket.isEmpty() + || startInSameBucket(segment.getStartTimeMs(), + selectedSegmentsPerBucket.get(selectedSegmentsPerBucket.size() - 1).getStartTimeMs(), _bucketMs)) { + // If the input segment belongs to current bucket, add it to the bucket + selectedSegmentsPerBucket.add(segment); + } else { + // The segment doesn't belong to current buckets, need to create a new bucket + if (_hasSpilledOverData) { + // The selection result is sealed if the last bucket has spilled over data + return false; + } else { + selectedSegmentsPerBucket = new ArrayList<>(); + selectedSegmentsPerBucket.add(segment); + _selectedSegments.add(selectedSegmentsPerBucket); + } + } + + if (hasSpilledOverData(segment, _bucketMs)) { + _hasSpilledOverData = true; + } + if (!isMergedSegment(segment, _mergeLevel)) { + if (!_hasUnmergedSegments) { + _firstUnmergedBucket = _selectedSegments.size() - 1; + } + _hasUnmergedSegments = true; + } + return true; + } + + boolean hasUnmergedSegments() { Review comment: Updated to use `hasUnmergedSegments` to track each bucket. So we will not schedule the merged buckets again if some buckets got stuck. For example: 8/10, 8/11, 8/12, 8/13, 8/14 and parallelism = 3 First round we schedule 8/10, 8/11, 8/12. 8/10 is stuck over 24 hours, and 8/11, 8/12 are merged successfully. Second round the scheduler will pick 8/10, 8/13 and 8/14 and skip 8/11 and 8/12. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org