This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 47e49ec  Use valid bucket end time instead of segment end time for 
merge/rollup delay metrics. This handles the corner case that the metric take 
the segments which are not ready to merge into consideration. (#7827)
47e49ec is described below

commit 47e49ecd6e11aebe74f8868cdac22051b175d4c5
Author: Jiapeng Tao <jia...@linkedin.com>
AuthorDate: Wed Dec 1 18:27:06 2021 -0800

    Use valid bucket end time instead of segment end time for merge/rollup 
delay metrics. This handles the corner case that the metric take the segments 
which are not ready to merge into consideration. (#7827)
---
 .../mergerollup/MergeRollupTaskGenerator.java      | 55 +++++++++++++++-------
 1 file changed, 37 insertions(+), 18 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index ece07be..1f81011 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -114,15 +114,15 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
 
   // tableNameWithType -> mergeLevel -> watermarkMs
   private Map<String, Map<String, Long>> _mergeRollupWatermarks;
-  // tableNameWithType -> maxEndTime
-  private Map<String, Long> _tableMaxEndTimeMs;
+  // tableNameWithType -> maxValidBucketEndTime
+  private Map<String, Long> _tableMaxValidBucketEndTimeMs;
   private ClusterInfoAccessor _clusterInfoAccessor;
 
   @Override
   public void init(ClusterInfoAccessor clusterInfoAccessor) {
     _clusterInfoAccessor = clusterInfoAccessor;
     _mergeRollupWatermarks = new HashMap<>();
-    _tableMaxEndTimeMs = new HashMap<>();
+    _tableMaxValidBucketEndTimeMs = new HashMap<>();
   }
 
   @Override
@@ -252,15 +252,13 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
         long bucketEndMs = bucketStartMs + bucketMs;
         // Create delay metrics even if there's no task scheduled, this helps 
the case that the controller is restarted
         // but the metrics are not available until the controller schedules a 
valid task
-        long maxEndTimeMs = Long.MIN_VALUE;
+        long maxValidBucketEndTimeMs = Long.MIN_VALUE;
         for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
-          long currentEndTimeMs = preSelectedSegment.getEndTimeMs();
-          // Compute maxEndTimeMs among segments that are valid for merge
-          if (currentEndTimeMs < System.currentTimeMillis() - bufferMs) {
-            maxEndTimeMs = Math.max(maxEndTimeMs, currentEndTimeMs);
-          }
+          // Compute maxValidBucketEndTimeMs among segments that are ready for 
merge
+          long currentValidBucketEndTimeMs = 
getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs);
+          maxValidBucketEndTimeMs = Math.max(maxValidBucketEndTimeMs, 
currentValidBucketEndTimeMs);
         }
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, 
watermarkMs, maxEndTimeMs,
+        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, 
watermarkMs, maxValidBucketEndTimeMs,
             bufferMs, bucketMs);
         if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, 
mergeRollupTaskMetadata)) {
           LOGGER.info("Bucket with start: {} and end: {} (table : {}, 
mergeLevel : {}) cannot be merged yet",
@@ -346,8 +344,8 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
             watermarkMs, newWatermarkMs);
 
         // Update the delay metrics
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, 
lowerMergeLevel, newWatermarkMs, maxEndTimeMs,
-            bufferMs, bucketMs);
+        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, 
lowerMergeLevel, newWatermarkMs,
+            maxValidBucketEndTimeMs, bufferMs, bucketMs);
 
         // Create task configs
         int maxNumRecordsPerTask =
@@ -440,6 +438,28 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
   }
 
   /**
+   * Get the valid bucket end time before the buffer (now - bufferMs). 
Consider the segment as multiple contiguous
+   * time buckets, this function will return the last bucket end time before 
the buffer. Return LONG.MIN_VALUE if
+   * there's no valid bucket before the buffer.
+   */
+  private long getValidBucketEndTimeMsForSegment(SegmentZKMetadata 
segmentZKMetadata, long bucketMs, long bufferMs) {
+    // Make sure the segment is ready for merge (the first bucket <= now - 
bufferTime)
+    long currentTimeMs = System.currentTimeMillis();
+    long firstBucketEndTimeMs = segmentZKMetadata.getStartTimeMs() / bucketMs 
* bucketMs + bucketMs;
+    if (firstBucketEndTimeMs > currentTimeMs - bufferMs) {
+      return Long.MIN_VALUE;
+    }
+    // The validBucketEndTime is calculated as the min(segment end time, now - 
bufferTime) rounded to the bucket
+    // boundary.
+    // Notice bucketEndTime is exclusive while segment end time is inclusive. 
E.g. if bucketTime = 1d,
+    // the rounded segment end time of [10/1 00:00, 10/1 23:59] is 10/2 00:00. 
The rounded segment end time of
+    // [10/1 00:00, 10/2 00:00] is 10/3 00:00
+    long validBucketEndTimeMs = (segmentZKMetadata.getEndTimeMs() / bucketMs + 
1) * bucketMs;
+    validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs - 
bucketMs) / bucketMs * bucketMs);
+    return validBucketEndTimeMs;
+  }
+
+  /**
    * Check if the segment span multiple buckets
    */
   private boolean hasSpilledOverData(SegmentZKMetadata segmentZKMetadata, long 
bucketMs) {
@@ -562,17 +582,16 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
   /**
    * Update the delay metrics for the given table and merge level. We create 
the new gauge metric if the metric is not
    * available.
-   *
    * @param tableNameWithType table name with type
    * @param mergeLevel merge level
    * @param lowerMergeLevel lower merge level
    * @param watermarkMs current watermark value
-   * @param maxEndTimeMs max end time of all the segments for the table
+   * @param maxValidBucketEndTimeMs max valid bucket end time of all the 
segments for the table
    * @param bufferTimeMs buffer time
    * @param bucketTimeMs bucket time
    */
   private void createOrUpdateDelayMetrics(String tableNameWithType, String 
mergeLevel, String lowerMergeLevel,
-      long watermarkMs, long maxEndTimeMs, long bufferTimeMs, long 
bucketTimeMs) {
+      long watermarkMs, long maxValidBucketEndTimeMs, long bufferTimeMs, long 
bucketTimeMs) {
     ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
     if (controllerMetrics == null) {
       return;
@@ -581,7 +600,7 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
     // Update gauge value that indicates the delay in terms of the number of 
time buckets.
     Map<String, Long> watermarkForTable =
         _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new 
ConcurrentHashMap<>());
-    _tableMaxEndTimeMs.put(tableNameWithType, maxEndTimeMs);
+    _tableMaxValidBucketEndTimeMs.put(tableNameWithType, 
maxValidBucketEndTimeMs);
     watermarkForTable.compute(mergeLevel, (k, v) -> {
       if (v == null) {
         LOGGER.info(
@@ -589,11 +608,11 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
                 + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, 
taskDelayInNumTimeBuckets={})", tableNameWithType,
             mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
             getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, 
lowerMergeLevel == null
-                    ? _tableMaxEndTimeMs.get(tableNameWithType) : 
watermarkForTable.get(lowerMergeLevel),
+                    ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) : 
watermarkForTable.get(lowerMergeLevel),
                 bufferTimeMs, bucketTimeMs));
         
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
 mergeLevel),
             (() -> 
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
-                lowerMergeLevel == null ? 
_tableMaxEndTimeMs.get(tableNameWithType)
+                lowerMergeLevel == null ? 
_tableMaxValidBucketEndTimeMs.get(tableNameWithType)
                     : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, 
bucketTimeMs)));
       }
       return watermarkMs;

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to