This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 26a83ba4f3 [bugfix] fix mergeRollupTask metrics (#9864)
26a83ba4f3 is described below

commit 26a83ba4f302b8b402d38d437f635ff1a4903767
Author: Haitao Zhang <hai...@startree.ai>
AuthorDate: Mon Nov 28 21:32:45 2022 -0800

    [bugfix] fix mergeRollupTask metrics (#9864)
    
    * [bugfix] fix mergeRollupTask metrics
    
    * fix a typo
    
    * add comments
    
    * fix comments
---
 .../mergerollup/MergeRollupTaskGenerator.java      | 43 ++++++++++++----------
 1 file changed, 24 insertions(+), 19 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index 28043c2f60..fc7be90657 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -115,8 +115,8 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
 
   // tableNameWithType -> mergeLevel -> watermarkMs
   private final Map<String, Map<String, Long>> _mergeRollupWatermarks = new 
HashMap<>();
-  // tableNameWithType -> maxValidBucketEndTime
-  private final Map<String, Long> _tableMaxValidBucketEndTimeMs = new 
HashMap<>();
+  // tableNameWithType -> lowestLevelMaxValidBucketEndTime
+  private final Map<String, Long> _tableLowestLevelMaxValidBucketEndTimeMs = 
new HashMap<>();
 
   @Override
   public String getTaskType() {
@@ -244,16 +244,20 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
             getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), 
bucketMs, mergeLevel, mergeRollupTaskMetadata);
         long bucketStartMs = watermarkMs;
         long bucketEndMs = bucketStartMs + bucketMs;
+        if (lowerMergeLevel == null) {
+          long lowestLevelMaxValidBucketEndTimeMs = Long.MIN_VALUE;
+          for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
+            // Compute lowestLevelMaxValidBucketEndTimeMs among segments that 
are ready for merge
+            long currentValidBucketEndTimeMs =
+                getValidBucketEndTimeMsForSegment(preSelectedSegment, 
bucketMs, bufferMs);
+            lowestLevelMaxValidBucketEndTimeMs =
+                Math.max(lowestLevelMaxValidBucketEndTimeMs, 
currentValidBucketEndTimeMs);
+          }
+          _tableLowestLevelMaxValidBucketEndTimeMs.put(offlineTableName, 
lowestLevelMaxValidBucketEndTimeMs);
+        }
         // Create delay metrics even if there's no task scheduled, this helps 
the case that the controller is restarted
         // but the metrics are not available until the controller schedules a 
valid task
-        long maxValidBucketEndTimeMs = Long.MIN_VALUE;
-        for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
-          // Compute maxValidBucketEndTimeMs among segments that are ready for 
merge
-          long currentValidBucketEndTimeMs = 
getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs);
-          maxValidBucketEndTimeMs = Math.max(maxValidBucketEndTimeMs, 
currentValidBucketEndTimeMs);
-        }
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, 
watermarkMs, maxValidBucketEndTimeMs,
-            bufferMs, bucketMs);
+        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, 
watermarkMs, bufferMs, bucketMs);
         if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, 
mergeRollupTaskMetadata)) {
           LOGGER.info("Bucket with start: {} and end: {} (table : {}, 
mergeLevel : {}) cannot be merged yet",
               bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
@@ -271,6 +275,9 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
         //    For each bucket find all segments overlapping with the target 
bucket, skip the bucket if all overlapping
         //    segments are merged. Schedule k (numParallelBuckets) buckets at 
most, and stops at the first bucket that
         //    contains spilled over data.
+        //    One may wonder how a segment with records spanning different 
buckets is handled. The short answer is that
+        //    it will be cut into multiple segments, each for a separate 
bucket. This is achieved by setting bucket time
+        //    period as PARTITION_BUCKET_TIME_PERIOD when generating 
PinotTaskConfigs
         // 2. There's no bucket with unmerged segments, skip scheduling
         for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
           long startTimeMs = preSelectedSegment.getStartTimeMs();
@@ -338,8 +345,7 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
             watermarkMs, newWatermarkMs);
 
         // Update the delay metrics
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, 
lowerMergeLevel, newWatermarkMs,
-            maxValidBucketEndTimeMs, bufferMs, bucketMs);
+        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, 
lowerMergeLevel, newWatermarkMs, bufferMs, bucketMs);
 
         // Create task configs
         int maxNumRecordsPerTask =
@@ -459,7 +465,7 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
     // the rounded segment end time of [10/1 00:00, 10/1 23:59] is 10/2 00:00. 
The rounded segment end time of
     // [10/1 00:00, 10/2 00:00] is 10/3 00:00
     long validBucketEndTimeMs = (segmentZKMetadata.getEndTimeMs() / bucketMs + 
1) * bucketMs;
-    validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs - 
bucketMs) / bucketMs * bucketMs);
+    validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs - 
bufferMs) / bucketMs * bucketMs);
     return validBucketEndTimeMs;
   }
 
@@ -603,12 +609,11 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
    * @param mergeLevel merge level
    * @param lowerMergeLevel lower merge level
    * @param watermarkMs current watermark value
-   * @param maxValidBucketEndTimeMs max valid bucket end time of all the 
segments for the table
    * @param bufferTimeMs buffer time
    * @param bucketTimeMs bucket time
    */
   private void createOrUpdateDelayMetrics(String tableNameWithType, String 
mergeLevel, String lowerMergeLevel,
-      long watermarkMs, long maxValidBucketEndTimeMs, long bufferTimeMs, long 
bucketTimeMs) {
+      long watermarkMs, long bufferTimeMs, long bucketTimeMs) {
     ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
     if (controllerMetrics == null) {
       return;
@@ -617,19 +622,19 @@ public class MergeRollupTaskGenerator extends 
BaseTaskGenerator {
     // Update gauge value that indicates the delay in terms of the number of 
time buckets.
     Map<String, Long> watermarkForTable =
         _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new 
ConcurrentHashMap<>());
-    _tableMaxValidBucketEndTimeMs.put(tableNameWithType, 
maxValidBucketEndTimeMs);
     watermarkForTable.compute(mergeLevel, (k, v) -> {
       if (v == null) {
         LOGGER.info(
             "Creating the gauge metric for tracking the merge/roll-up task 
delay for table: {} and mergeLevel: {}."
                 + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, 
taskDelayInNumTimeBuckets={})", tableNameWithType,
-            mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+            mergeLevel, watermarkMs, bufferTimeMs, bucketTimeMs,
             getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, 
lowerMergeLevel == null
-                    ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) : 
watermarkForTable.get(lowerMergeLevel),
+                    ? 
_tableLowestLevelMaxValidBucketEndTimeMs.get(tableNameWithType)
+                    : watermarkForTable.get(lowerMergeLevel),
                 bufferTimeMs, bucketTimeMs));
         
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
 mergeLevel),
             (() -> 
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
-                lowerMergeLevel == null ? 
_tableMaxValidBucketEndTimeMs.get(tableNameWithType)
+                lowerMergeLevel == null ? 
_tableLowestLevelMaxValidBucketEndTimeMs.get(tableNameWithType)
                     : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, 
bucketTimeMs)));
       }
       return watermarkMs;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to