snleee commented on a change in pull request #7481: URL: https://github.com/apache/pinot/pull/7481#discussion_r715791111
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -342,6 +349,74 @@ public String getTaskType() { return pinotTaskConfigs; } + /** + * Segment selection result + */ + private class SegmentsSelection { + private long _bucketMs; + private String _mergeLevel; + private boolean _hasSpilledOverData = false; + private boolean _hasUnmergedSegments = false; + // List of buckets of segments, segments in _selectedSegments[_firstUnmergedBuckets: ] will be picked to be merged + List<List<SegmentZKMetadata>> _selectedSegments = new ArrayList<>(); Review comment: The size of this list will not go beyond `numParallelBuckets`. We can slightly optimize the list by feeding the size. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -77,20 +77,23 @@ * A new ZNode will be created, with watermarkMs as the smallest time found in all segments truncated to the * closest bucket start time. * - The execution window for the task is calculated as, - * windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs + * windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs * numParallelBuckets * - Skip scheduling if the window is invalid: * - If the execution window is not older than bufferTimeMs, no task will be generated * - The windowEndMs of higher merge level should be less or equal to the waterMarkMs of lower level * - Bump up target window and watermark if needed. * - If there's no unmerged segments (by checking segment zk metadata {mergeRollupTask.mergeLevel: level}) for - * current window, - * keep bumping up the watermark and target window until unmerged segments are found. Else skip the scheduling. - * - Select all segments for the target window - * - Create tasks (per partition for partitioned table) based on maxNumRecordsPerTask + * current window, keep bumping up the watermark and target window until unmerged segments are found. + * Else skip the scheduling. + * - Select segments for each bucket in the target window: + * - Skip buckets which all segments are merged + * - Pick buckets till the first bucket that has spilled over data + * - Create tasks per bucket (and per partition for partitioned table) based on maxNumRecordsPerTask Review comment: Let's add a section on how we decided to handle spilled over segements. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java ########## @@ -270,9 +270,81 @@ public void testMaxNumRecordsPerTask() { .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3)); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); assertEquals(pinotTaskConfigs.size(), 2); - checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, "daily", "concat", + checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", + "1d", null, "1000000"); + checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, DAILY, "concat", "1d", null, "1000000"); + } + + /** + * Test num parallel buckets + */ + @Test + public void testNumParallelBuckets() { Review comment: Add one test to cover the new logic to schedule when # of scheduled buckets < numParallelBuckets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org