snleee commented on a change in pull request #7481: URL: https://github.com/apache/pinot/pull/7481#discussion_r725438368
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java ########## @@ -270,9 +270,121 @@ public void testMaxNumRecordsPerTask() { .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3)); pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); assertEquals(pinotTaskConfigs.size(), 2); - checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, "daily", "concat", + checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", "1d", null, "1000000"); - checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, "daily", "concat", "1d", null, "1000000"); + checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, DAILY, "concat", "1d", null, "1000000"); + } + + /** + * Test num parallel buckets + */ + @Test + public void testNumParallelBuckets() { + Map<String, Map<String, String>> taskConfigsMap = new HashMap<>(); + Map<String, String> tableTaskConfigs = new HashMap<>(); + tableTaskConfigs.put("daily.mergeType", "concat"); + tableTaskConfigs.put("daily.bufferTimePeriod", "2d"); + tableTaskConfigs.put("daily.bucketTimePeriod", "1d"); + tableTaskConfigs.put("daily.numParallelBuckets", "3"); + tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000"); + taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, tableTaskConfigs); + TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap); + ClusterInfoAccessor mockClusterInfoProvide = mock(ClusterInfoAccessor.class); + + String segmentName1 = "testTable__1"; + String segmentName2 = "testTable__2"; + String segmentName3 = "testTable__3"; + String segmentName4 = "testTable__4"; + String segmentName5 = "testTable__5"; + SegmentZKMetadata metadata1 = + getSegmentZKMetadata(segmentName1, 86_400_000L, 90_000_000L, TimeUnit.MILLISECONDS, "download1"); + SegmentZKMetadata metadata2 = + getSegmentZKMetadata(segmentName2, 86_400_000L, 100_000_000L, TimeUnit.MILLISECONDS, "download2"); + SegmentZKMetadata metadata3 = + getSegmentZKMetadata(segmentName3, 172_800_000L, 173_000_000L, TimeUnit.MILLISECONDS, "download3"); + SegmentZKMetadata metadata4 = + getSegmentZKMetadata(segmentName4, 259_200_000L, 260_000_000L, TimeUnit.MILLISECONDS, "download4"); + SegmentZKMetadata metadata5 = + getSegmentZKMetadata(segmentName5, 345_600_000L, 346_000_000L, TimeUnit.MILLISECONDS, "download5"); + + // No spilled over data + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) + .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5)); + MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator(); + generator.init(mockClusterInfoProvide); + List<PinotTaskConfig> pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); + assertEquals(pinotTaskConfigs.size(), 3); + checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", + "1d", null, "1000000"); + checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, DAILY, "concat", + "1d", null, "1000000"); + checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, DAILY, "concat", + "1d", null, "1000000"); + + // Has spilled over data + String segmentName6 = "testTable__6"; + SegmentZKMetadata metadata6 = + getSegmentZKMetadata(segmentName6, 172_800_000L, 260_000_000L, TimeUnit.MILLISECONDS, null); + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) + .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, metadata4, metadata5, metadata6)); + pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); + assertEquals(pinotTaskConfigs.size(), 2); + checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", + "1d", null, "1000000"); + checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + "," + segmentName6, DAILY, "concat", + "1d", null, "1000000"); + + // Has time bucket without overlapping segments + when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)) + .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata4, metadata5)); + pinotTaskConfigs = generator.generateTasks(Lists.newArrayList(offlineTableConfig)); + assertEquals(pinotTaskConfigs.size(), 3); + checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + "," + segmentName2, DAILY, "concat", + "1d", null, "1000000"); + checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName4, DAILY, "concat", + "1d", null, "1000000"); + checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName5, DAILY, "concat", + "1d", null, "1000000"); + + // Heading buckets are merged Review comment: For completeness, can we cover the case where we have the non-merged segment in the middle? (middle task failed while we use parallel schedule) Here is the case that you wrote (we can add the test case for the similar case): ``` For example: 8/10, 8/11, 8/12, 8/13, 8/14 and parallelism = 3 First round we schedule 8/10, 8/11, 8/12. 8/10 is stuck over 24 hours, and 8/11, 8/12 are merged successfully. Second round the scheduler will pick 8/10, 8/13 and 8/14 and skip 8/11 and 8/12. ``` ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -212,106 +221,129 @@ public String getTaskType() { continue; } - // Get the bucket size and buffer + // Get the bucket size, buffer and number of parallel buckets (1 as default) long bucketMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY)); if (bucketMs <= 0) { LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must be larger than 0", offlineTableName, mergeLevel); continue; } long bufferMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY)); + int numParallelBuckets = mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null Review comment: Let's add the precondition check `Preconditions.checkState(numParallelBuckets > 0, "numParallelBuckets has to be larger than 0` since we later access `selectedSegmentsForAllBuckets.get(0)` with the assumption that the size of `selectedSegmentsForAllBuckets` is always larger than 0. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -65,36 +66,44 @@ * * - Pre-select segments: * - Fetch all segments, select segments based on segment lineage (removing segmentsFrom for COMPLETED lineage - * entry and - * segmentsTo for IN_PROGRESS lineage entry) + * entry and segmentsTo for IN_PROGRESS lineage entry) * - Remove empty segments * - Sort segments based on startTime and endTime in ascending order * * For each mergeLevel (from lowest to highest, e.g. Hourly -> Daily -> Monthly -> Yearly): * - Skip scheduling if there's incomplete task for the mergeLevel - * - Calculate merge/rollup window: - * - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode - * found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType> - * In case of cold-start, no ZNode will exist. - * A new ZNode will be created, with watermarkMs as the smallest time found in all segments truncated to the - * closest bucket start time. - * - The execution window for the task is calculated as, - * windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs - * - Skip scheduling if the window is invalid: - * - If the execution window is not older than bufferTimeMs, no task will be generated - * - The windowEndMs of higher merge level should be less or equal to the waterMarkMs of lower level - * - Bump up target window and watermark if needed. - * - If there's no unmerged segments (by checking segment zk metadata {mergeRollupTask.mergeLevel: level}) for - * current window, - * keep bumping up the watermark and target window until unmerged segments are found. Else skip the scheduling. - * - Select all segments for the target window - * - Create tasks (per partition for partitioned table) based on maxNumRecordsPerTask + * - Schedule tasks for k time buckets, k is up to numParallelTasks at best effort + * - For each time bucket: Review comment: I think that `For each time bucket` is a bit confusing given that we don't have the loop over time buckets within the code. Instead, let's improve this line (e.g. `Repeat until we k tasks gets created or we loop through all the candidate segments` ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -65,36 +66,44 @@ * * - Pre-select segments: * - Fetch all segments, select segments based on segment lineage (removing segmentsFrom for COMPLETED lineage - * entry and - * segmentsTo for IN_PROGRESS lineage entry) + * entry and segmentsTo for IN_PROGRESS lineage entry) * - Remove empty segments * - Sort segments based on startTime and endTime in ascending order * * For each mergeLevel (from lowest to highest, e.g. Hourly -> Daily -> Monthly -> Yearly): * - Skip scheduling if there's incomplete task for the mergeLevel - * - Calculate merge/rollup window: - * - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode - * found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType> - * In case of cold-start, no ZNode will exist. - * A new ZNode will be created, with watermarkMs as the smallest time found in all segments truncated to the - * closest bucket start time. - * - The execution window for the task is calculated as, - * windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs - * - Skip scheduling if the window is invalid: - * - If the execution window is not older than bufferTimeMs, no task will be generated - * - The windowEndMs of higher merge level should be less or equal to the waterMarkMs of lower level - * - Bump up target window and watermark if needed. - * - If there's no unmerged segments (by checking segment zk metadata {mergeRollupTask.mergeLevel: level}) for - * current window, - * keep bumping up the watermark and target window until unmerged segments are found. Else skip the scheduling. - * - Select all segments for the target window - * - Create tasks (per partition for partitioned table) based on maxNumRecordsPerTask + * - Schedule tasks for k time buckets, k is up to numParallelTasks at best effort + * - For each time bucket: + * - Calculate merge/roll-up window: + * - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode + * found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType> + * In case of cold-start, no ZNode will exist. + * A new ZNode will be created, with watermarkMs as the smallest time found in all segments truncated to the + * closest bucket start time. + * - The execution window for the task is calculated as, + * bucketStartMs = watermarkMs + * bucketEndMs = windowStartMs + bucketTimeMs + * - bucketEndMs must be equal or older than the bufferTimeMs + * - bucketEndMs of higher merge level should be less or equal to the waterMarkMs of lower level + * - Skip scheduling if the window is invalid (windowEndMs <= windowStartMs) + * - Bump up target window and watermark if needed. + * - If there's no unmerged segments (by checking segment zk metadata {mergeRollupTask.mergeLevel: level}) for + * current window, keep bumping up the watermark and target window until unmerged segments are found. + * Else skip the scheduling. + * - Select segments for the bucket: + * - Skip buckets which all segments are merged + * - If there's no spilled over segments (segments spanning multiple time buckets), + * schedule all buckets in parallel + * - Else, schedule buckets till the first one that has spilled over data (included), so the spilled over data + * will be merged next round + * - Create tasks per bucket (and per partition for partitioned table) based on maxNumRecordsPerTask Review comment: Create the task for the current bucket. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -212,106 +221,129 @@ public String getTaskType() { continue; } - // Get the bucket size and buffer + // Get the bucket size, buffer and number of parallel buckets (1 as default) long bucketMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY)); if (bucketMs <= 0) { LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must be larger than 0", offlineTableName, mergeLevel); continue; } long bufferMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY)); + int numParallelBuckets = mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null + ? Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : DEFAULT_NUM_PARALLEL_BUCKETS; // Get watermark from MergeRollupTaskMetadata ZNode - // windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs + // bucketStartMs = watermarkMs + // bucketEndMs = bucketStartMs + bucketTimeMs long waterMarkMs = getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), bucketMs, mergeLevel, mergeRollupTaskMetadata); - long windowStartMs = waterMarkMs; - long windowEndMs = windowStartMs + bucketMs; - - if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) { - LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is not a valid merge window, Skipping task " - + "generation: {}", windowStartMs, windowEndMs, mergeLevel, taskType); + long bucketStartMs = waterMarkMs; + long bucketEndMs = getBucketEndTime(bucketStartMs, bucketMs, bufferMs, lowerMergeLevel, + mergeRollupTaskMetadata); + + if (!isValidBucket(bucketStartMs, bucketEndMs)) { + LOGGER.info( + "Bucket with start: {} and end: {} of mergeLevel: {} is not a valid merge window, Skipping task " + + "generation: {}", + bucketStartMs, bucketEndMs, mergeLevel, taskType); continue; } - // Find all segments overlapping with the merge window, if all overlapping segments are merged, bump up the - // target window - List<SegmentZKMetadata> selectedSegments = new ArrayList<>(); + // Review comment: Is this deleted by accident? Please address this :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org