jtao15 commented on a change in pull request #7481: URL: https://github.com/apache/pinot/pull/7481#discussion_r726452828
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -212,106 +221,129 @@ public String getTaskType() { continue; } - // Get the bucket size and buffer + // Get the bucket size, buffer and number of parallel buckets (1 as default) long bucketMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY)); if (bucketMs <= 0) { LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must be larger than 0", offlineTableName, mergeLevel); continue; } long bufferMs = - TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY)); + TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY)); + int numParallelBuckets = mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null + ? Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : DEFAULT_NUM_PARALLEL_BUCKETS; // Get watermark from MergeRollupTaskMetadata ZNode - // windowStartMs = watermarkMs, windowEndMs = windowStartMs + bucketTimeMs + // bucketStartMs = watermarkMs + // bucketEndMs = bucketStartMs + bucketTimeMs long waterMarkMs = getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), bucketMs, mergeLevel, mergeRollupTaskMetadata); - long windowStartMs = waterMarkMs; - long windowEndMs = windowStartMs + bucketMs; - - if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) { - LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is not a valid merge window, Skipping task " - + "generation: {}", windowStartMs, windowEndMs, mergeLevel, taskType); + long bucketStartMs = waterMarkMs; + long bucketEndMs = getBucketEndTime(bucketStartMs, bucketMs, bufferMs, lowerMergeLevel, + mergeRollupTaskMetadata); + + if (!isValidBucket(bucketStartMs, bucketEndMs)) { + LOGGER.info( + "Bucket with start: {} and end: {} of mergeLevel: {} is not a valid merge window, Skipping task " + + "generation: {}", + bucketStartMs, bucketEndMs, mergeLevel, taskType); continue; } - // Find all segments overlapping with the merge window, if all overlapping segments are merged, bump up the - // target window - List<SegmentZKMetadata> selectedSegments = new ArrayList<>(); + // Review comment: Updated the comment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org