snleee commented on a change in pull request #7481: URL: https://github.com/apache/pinot/pull/7481#discussion_r715356933
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -193,18 +196,20 @@ public String getTaskType() { continue; } - // Get the bucket size and buffer + // Get the bucket size, buffer and number of parallel buckets (1 as default) long bucketMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY)); long bufferMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY)); + int numParallelBuckets = mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null + ? Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : DEFAULT_NUM_PARALLEL_BUCKETS; // Get watermark from MergeRollupTaskMetadata ZNode - // windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs + // windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs * numParallelBuckets long waterMarkMs = getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), bucketMs, mergeLevel, mergeRollupTaskMetadata); long windowStartMs = waterMarkMs; - long windowEndMs = windowStartMs + bucketMs; + long windowEndMs = windowStartMs + bucketMs * numParallelBuckets; Review comment: I think that always putting `windowEndMs = windowStartMs + bucketMs * numParallelBuckets` can make the validation below `isValidMergeWindowEndTime()` to fail even for the cases that we have some segments that we can schedule. Please think of the following example: current time = 9/5 buffer = 1d numParallelBucket = 3 currentWatermark = 9/2 We start from 9/2. In this round, we should schedule `[9/2, 9/3), [9/3, 9/4)`. However, by the new logic, `startWindow = 9/2, endWindow = 9/5` so this is no longer valid and we may not schedule any segments. Can you go through this scenario? The expected behavior is that we would want to honor `numParallelBuckets` at `best effort`. (Even if the parallelism = 3, we should still be able to schedule 1 or 2 buckets depending on the availability of windows.) I think that this approach may make `"real" bufferTime = bufferTime + numParallelBuckets - 1` (think about the most base case where we finished to catch up and we merge segments for 1 day bucket daily) ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -342,6 +349,74 @@ public String getTaskType() { return pinotTaskConfigs; } + /** + * Segment selection result + */ + private class SegmentsSelection { Review comment: Do we have the better name? `SelectedSegments` or `SegmentsSelectionResult`? ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -77,20 +77,23 @@ * A new ZNode will be created, with watermarkMs as the smallest time found in all segments truncated to the * closest bucket start time. * - The execution window for the task is calculated as, - * windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs + * windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs * numParallelBuckets * - Skip scheduling if the window is invalid: * - If the execution window is not older than bufferTimeMs, no task will be generated * - The windowEndMs of higher merge level should be less or equal to the waterMarkMs of lower level * - Bump up target window and watermark if needed. * - If there's no unmerged segments (by checking segment zk metadata {mergeRollupTask.mergeLevel: level}) for - * current window, - * keep bumping up the watermark and target window until unmerged segments are found. Else skip the scheduling. - * - Select all segments for the target window - * - Create tasks (per partition for partitioned table) based on maxNumRecordsPerTask + * current window, keep bumping up the watermark and target window until unmerged segments are found. + * Else skip the scheduling. + * - Select segments for each bucket in the target window: + * - Skip buckets which all segments are merged + * - Pick buckets till the first bucket that has spilled over data Review comment: This line is not clear to me. Are you trying to explain that the spilled over segments (segments spanning multiple time windows) will only be picked once by the time window who hits this segment for the first time? (handling spilled over segments when parallel bucket scheduling is enabled) ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -342,6 +349,74 @@ public String getTaskType() { return pinotTaskConfigs; } + /** + * Segment selection result + */ + private class SegmentsSelection { + private long _bucketMs; + private String _mergeLevel; + private boolean _hasSpilledOverData = false; + private boolean _hasUnmergedSegments = false; + // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged + List<List<SegmentZKMetadata>> _selectedSegments = new ArrayList<>(); + int _firstUnmergedBucket = 0; + + SegmentsSelection(long bucketMs, String mergeLevel) { + _bucketMs = bucketMs; + _mergeLevel = mergeLevel; + _selectedSegments.add(new ArrayList<>()); + } + + /** + * Add segment to the selection result + * + * @return false if the selection result is sealed, otherwise return true Review comment: I think that it's better to keep the regular contract for add (return true if successfully added, return false otherwise). For indicating `sealed result`, we can set the `_isResultSealed = true` and add the `isResultSealed()` function. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -342,6 +349,74 @@ public String getTaskType() { return pinotTaskConfigs; } + /** + * Segment selection result + */ + private class SegmentsSelection { + private long _bucketMs; + private String _mergeLevel; + private boolean _hasSpilledOverData = false; + private boolean _hasUnmergedSegments = false; + // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged + List<List<SegmentZKMetadata>> _selectedSegments = new ArrayList<>(); + int _firstUnmergedBucket = 0; + + SegmentsSelection(long bucketMs, String mergeLevel) { + _bucketMs = bucketMs; + _mergeLevel = mergeLevel; + _selectedSegments.add(new ArrayList<>()); + } + + /** + * Add segment to the selection result + * + * @return false if the selection result is sealed, otherwise return true + */ + boolean add(SegmentZKMetadata segment) { + List<SegmentZKMetadata> selectedSegmentsPerBucket = _selectedSegments.get(_selectedSegments.size() - 1); + if (selectedSegmentsPerBucket.isEmpty() Review comment: Add the comment about the logic ``` If the input segment belongs to the latest bucket... ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org