snleee commented on a change in pull request #7481:
URL: https://github.com/apache/pinot/pull/7481#discussion_r725438368



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
##########
@@ -270,9 +270,121 @@ public void testMaxNumRecordsPerTask() {
         .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3));
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
     assertEquals(pinotTaskConfigs.size(), 2);
-    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, "daily", "concat",
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
         "1d", null, "1000000");
-    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
"daily", "concat", "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
DAILY, "concat", "1d", null, "1000000");
+  }
+
+  /**
+   * Test num parallel buckets
+   */
+  @Test
+  public void testNumParallelBuckets() {
+    Map<String, Map<String, String>> taskConfigsMap = new HashMap<>();
+    Map<String, String> tableTaskConfigs = new HashMap<>();
+    tableTaskConfigs.put("daily.mergeType", "concat");
+    tableTaskConfigs.put("daily.bufferTimePeriod", "2d");
+    tableTaskConfigs.put("daily.bucketTimePeriod", "1d");
+    tableTaskConfigs.put("daily.numParallelBuckets", "3");
+    tableTaskConfigs.put("daily.maxNumRecordsPerSegment", "1000000");
+    taskConfigsMap.put(MinionConstants.MergeRollupTask.TASK_TYPE, 
tableTaskConfigs);
+    TableConfig offlineTableConfig = getOfflineTableConfig(taskConfigsMap);
+    ClusterInfoAccessor mockClusterInfoProvide = 
mock(ClusterInfoAccessor.class);
+
+    String segmentName1 = "testTable__1";
+    String segmentName2 = "testTable__2";
+    String segmentName3 = "testTable__3";
+    String segmentName4 = "testTable__4";
+    String segmentName5 = "testTable__5";
+    SegmentZKMetadata metadata1 =
+        getSegmentZKMetadata(segmentName1, 86_400_000L, 90_000_000L, 
TimeUnit.MILLISECONDS, "download1");
+    SegmentZKMetadata metadata2 =
+        getSegmentZKMetadata(segmentName2, 86_400_000L, 100_000_000L, 
TimeUnit.MILLISECONDS, "download2");
+    SegmentZKMetadata metadata3 =
+        getSegmentZKMetadata(segmentName3, 172_800_000L, 173_000_000L, 
TimeUnit.MILLISECONDS, "download3");
+    SegmentZKMetadata metadata4 =
+        getSegmentZKMetadata(segmentName4, 259_200_000L, 260_000_000L, 
TimeUnit.MILLISECONDS, "download4");
+    SegmentZKMetadata metadata5 =
+        getSegmentZKMetadata(segmentName5, 345_600_000L, 346_000_000L, 
TimeUnit.MILLISECONDS, "download5");
+
+    // No spilled over data
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5));
+    MergeRollupTaskGenerator generator = new MergeRollupTaskGenerator();
+    generator.init(mockClusterInfoProvide);
+    List<PinotTaskConfig> pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 3);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3, 
DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName4, 
DAILY, "concat",
+        "1d", null, "1000000");
+
+    // Has spilled over data
+    String segmentName6 = "testTable__6";
+    SegmentZKMetadata metadata6 =
+        getSegmentZKMetadata(segmentName6, 172_800_000L, 260_000_000L, 
TimeUnit.MILLISECONDS, null);
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata3, 
metadata4, metadata5, metadata6));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 2);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName3 + 
"," + segmentName6, DAILY, "concat",
+        "1d", null, "1000000");
+
+    // Has time bucket without overlapping segments
+    when(mockClusterInfoProvide.getSegmentsZKMetadata(OFFLINE_TABLE_NAME))
+        .thenReturn(Lists.newArrayList(metadata1, metadata2, metadata4, 
metadata5));
+    pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
+    assertEquals(pinotTaskConfigs.size(), 3);
+    checkPinotTaskConfig(pinotTaskConfigs.get(0).getConfigs(), segmentName1 + 
"," + segmentName2, DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(1).getConfigs(), segmentName4, 
DAILY, "concat",
+        "1d", null, "1000000");
+    checkPinotTaskConfig(pinotTaskConfigs.get(2).getConfigs(), segmentName5, 
DAILY, "concat",
+        "1d", null, "1000000");
+
+    // Heading buckets are merged

Review comment:
       For completeness, can we cover the case where we have the non-merged 
segment in the middle? (middle task failed while we use parallel schedule)
   
   Here is the case that you wrote (we can add the test case for the similar 
case):
   
   ```
   For example:
   8/10, 8/11, 8/12, 8/13, 8/14 and parallelism = 3
   First round we schedule 8/10, 8/11, 8/12. 8/10 is stuck over 24 hours, and 
8/11, 8/12 are merged successfully.
   Second round the scheduler will pick 8/10, 8/13 and 8/14 and skip 8/11 and 
8/12.
   ```

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -212,106 +221,129 @@ public String getTaskType() {
           continue;
         }
 
-        // Get the bucket size and buffer
+        // Get the bucket size, buffer and number of parallel buckets (1 as 
default)
         long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
         if (bucketMs <= 0) {
           LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must 
be larger than 0", offlineTableName,
               mergeLevel);
           continue;
         }
         long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY));
+        int numParallelBuckets = 
mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null

Review comment:
       Let's add the precondition check 
`Preconditions.checkState(numParallelBuckets > 0, "numParallelBuckets has to be 
larger than 0` since we later access `selectedSegmentsForAllBuckets.get(0)` 
with the assumption that the size of `selectedSegmentsForAllBuckets` is always 
larger than 0. 

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -65,36 +66,44 @@
  *
  *  - Pre-select segments:
  *    - Fetch all segments, select segments based on segment lineage (removing 
segmentsFrom for COMPLETED lineage
- *    entry and
- *      segmentsTo for IN_PROGRESS lineage entry)
+ *      entry and segmentsTo for IN_PROGRESS lineage entry)
  *    - Remove empty segments
  *    - Sort segments based on startTime and endTime in ascending order
  *
  *  For each mergeLevel (from lowest to highest, e.g. Hourly -> Daily -> 
Monthly -> Yearly):
  *    - Skip scheduling if there's incomplete task for the mergeLevel
- *    - Calculate merge/rollup window:
- *      - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode
- *        found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>
- *        In case of cold-start, no ZNode will exist.
- *        A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
- *        closest bucket start time.
- *      - The execution window for the task is calculated as,
- *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
- *      - Skip scheduling if the window is invalid:
- *        - If the execution window is not older than bufferTimeMs, no task 
will be generated
- *        - The windowEndMs of higher merge level should be less or equal to 
the waterMarkMs of lower level
- *      - Bump up target window and watermark if needed.
- *        - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
- *        current window,
- *          keep bumping up the watermark and target window until unmerged 
segments are found. Else skip the scheduling.
- *    - Select all segments for the target window
- *    - Create tasks (per partition for partitioned table) based on 
maxNumRecordsPerTask
+ *    - Schedule tasks for k time buckets, k is up to numParallelTasks at best 
effort
+ *    - For each time bucket:

Review comment:
       I think that `For each time bucket` is a bit confusing given that we 
don't have the loop over time buckets within the code. Instead, let's improve 
this line (e.g. `Repeat until we  k tasks gets created or we loop through all 
the candidate segments`

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -65,36 +66,44 @@
  *
  *  - Pre-select segments:
  *    - Fetch all segments, select segments based on segment lineage (removing 
segmentsFrom for COMPLETED lineage
- *    entry and
- *      segmentsTo for IN_PROGRESS lineage entry)
+ *      entry and segmentsTo for IN_PROGRESS lineage entry)
  *    - Remove empty segments
  *    - Sort segments based on startTime and endTime in ascending order
  *
  *  For each mergeLevel (from lowest to highest, e.g. Hourly -> Daily -> 
Monthly -> Yearly):
  *    - Skip scheduling if there's incomplete task for the mergeLevel
- *    - Calculate merge/rollup window:
- *      - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode
- *        found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>
- *        In case of cold-start, no ZNode will exist.
- *        A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
- *        closest bucket start time.
- *      - The execution window for the task is calculated as,
- *        windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
- *      - Skip scheduling if the window is invalid:
- *        - If the execution window is not older than bufferTimeMs, no task 
will be generated
- *        - The windowEndMs of higher merge level should be less or equal to 
the waterMarkMs of lower level
- *      - Bump up target window and watermark if needed.
- *        - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
- *        current window,
- *          keep bumping up the watermark and target window until unmerged 
segments are found. Else skip the scheduling.
- *    - Select all segments for the target window
- *    - Create tasks (per partition for partitioned table) based on 
maxNumRecordsPerTask
+ *    - Schedule tasks for k time buckets, k is up to numParallelTasks at best 
effort
+ *    - For each time bucket:
+ *      - Calculate merge/roll-up window:
+ *        - Read watermarkMs from the {@link MergeRollupTaskMetadata} ZNode
+ *          found at MINION_TASK_METADATA/MergeRollupTask/<tableNameWithType>
+ *          In case of cold-start, no ZNode will exist.
+ *          A new ZNode will be created, with watermarkMs as the smallest time 
found in all segments truncated to the
+ *          closest bucket start time.
+ *        - The execution window for the task is calculated as,
+ *          bucketStartMs = watermarkMs
+ *          bucketEndMs = windowStartMs + bucketTimeMs
+ *          - bucketEndMs must be equal or older than the bufferTimeMs
+ *          - bucketEndMs of higher merge level should be less or equal to the 
waterMarkMs of lower level
+ *        - Skip scheduling if the window is invalid (windowEndMs <= 
windowStartMs)
+ *        - Bump up target window and watermark if needed.
+ *          - If there's no unmerged segments (by checking segment zk metadata 
{mergeRollupTask.mergeLevel: level}) for
+ *            current window, keep bumping up the watermark and target window 
until unmerged segments are found.
+ *            Else skip the scheduling.
+ *      - Select segments for the bucket:
+ *        - Skip buckets which all segments are merged
+ *        - If there's no spilled over segments (segments spanning multiple 
time buckets),
+ *          schedule all buckets in parallel
+ *        - Else, schedule buckets till the first one that has spilled over 
data (included), so the spilled over data
+ *          will be merged next round
+ *      - Create tasks per bucket (and per partition for partitioned table) 
based on maxNumRecordsPerTask

Review comment:
       Create the task for the current bucket.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -212,106 +221,129 @@ public String getTaskType() {
           continue;
         }
 
-        // Get the bucket size and buffer
+        // Get the bucket size, buffer and number of parallel buckets (1 as 
default)
         long bucketMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUCKET_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUCKET_TIME_PERIOD_KEY));
         if (bucketMs <= 0) {
           LOGGER.error("Bucket time period (table : {}, mergeLevel : {}) must 
be larger than 0", offlineTableName,
               mergeLevel);
           continue;
         }
         long bufferMs =
-            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MinionConstants.MergeTask.BUFFER_TIME_PERIOD_KEY));
+            
TimeUtils.convertPeriodToMillis(mergeConfigs.get(MergeTask.BUFFER_TIME_PERIOD_KEY));
+        int numParallelBuckets = 
mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS) != null
+            ? 
Integer.parseInt(mergeConfigs.get(MergeRollupTask.NUM_PARALLEL_BUCKETS)) : 
DEFAULT_NUM_PARALLEL_BUCKETS;
 
         // Get watermark from MergeRollupTaskMetadata ZNode
-        // windowStartMs = watermarkMs, windowEndMs = windowStartMs + 
bucketTimeMs
+        // bucketStartMs = watermarkMs
+        // bucketEndMs = bucketStartMs + bucketTimeMs
         long waterMarkMs =
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
-        long windowStartMs = waterMarkMs;
-        long windowEndMs = windowStartMs + bucketMs;
-
-        if (!isValidMergeWindowEndTime(windowEndMs, bufferMs, lowerMergeLevel, 
mergeRollupTaskMetadata)) {
-          LOGGER.info("Window with start: {} and end: {} of mergeLevel: {} is 
not a valid merge window, Skipping task "
-              + "generation: {}", windowStartMs, windowEndMs, mergeLevel, 
taskType);
+        long bucketStartMs = waterMarkMs;
+        long bucketEndMs = getBucketEndTime(bucketStartMs, bucketMs, bufferMs, 
lowerMergeLevel,
+            mergeRollupTaskMetadata);
+
+        if (!isValidBucket(bucketStartMs, bucketEndMs)) {
+          LOGGER.info(
+              "Bucket with start: {} and end: {} of mergeLevel: {} is not a 
valid merge window, Skipping task "
+                  + "generation: {}",
+              bucketStartMs, bucketEndMs, mergeLevel, taskType);
           continue;
         }
 
-        // Find all segments overlapping with the merge window, if all 
overlapping segments are merged, bump up the
-        // target window
-        List<SegmentZKMetadata> selectedSegments = new ArrayList<>();
+        //

Review comment:
       Is this deleted by accident? Please address this :) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to