This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bcbda1  Use maxEndTimeMs for merge/roll-up delay metrics. (#7617)
7bcbda1 is described below

commit 7bcbda1680922b97aa1ccaecd3b5eeaf75a87a02
Author: Jiapeng Tao <jia...@linkedin.com>
AuthorDate: Fri Oct 22 20:07:09 2021 -0700

    Use maxEndTimeMs for merge/roll-up delay metrics. (#7617)
---
 .../mergerollup/MergeRollupTaskGenerator.java      | 40 ++++++++++++++--------
 1 file changed, 26 insertions(+), 14 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
index e81ee4e..3b59a6f 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
@@ -110,21 +110,19 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
   // number to be 7 and merge task is configured with "bucketTimePeriod = 1d", 
this means that we have 7 days of
   // delay. When operating merge/roll-up task in production, we should set the 
alert on this metrics to find out the
   // delay. Setting the alert on 7 time buckets of delay would be a good 
starting point.
-  //
-  // NOTE: Based on the current scheduler logic, we are bumping up the 
watermark with some delay. (the current round
-  // will bump up the watermark for the window that got processed from the 
previous round). Due to this, we will
-  // correctly report the delay with one edge case. When we processed all 
available time windows, the watermark
-  // will not get bumped up until we schedule some task for the table. Due to 
this, we will always see the delay >= 1.
   private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = 
"mergeRollupTaskDelayInNumBuckets";
 
   // tableNameWithType -> mergeLevel -> watermarkMs
   private Map<String, Map<String, Long>> _mergeRollupWatermarks;
+  // tableNameWithType -> maxEndTime
+  private Map<String, Long> _tableMaxEndTimeMs;
   private ClusterInfoAccessor _clusterInfoAccessor;
 
   @Override
   public void init(ClusterInfoAccessor clusterInfoAccessor) {
     _clusterInfoAccessor = clusterInfoAccessor;
     _mergeRollupWatermarks = new HashMap<>();
+    _tableMaxEndTimeMs = new HashMap<>();
   }
 
   @Override
@@ -254,7 +252,12 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
         long bucketEndMs = bucketStartMs + bucketMs;
         // Create delay metrics even if there's no task scheduled, this helps 
the case that the controller is restarted
         // but the metrics are not available until the controller schedules a 
valid task
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, watermarkMs, 
bufferMs, bucketMs);
+        long maxEndTimeMs = Long.MIN_VALUE;
+        for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) {
+          maxEndTimeMs = Math.max(maxEndTimeMs, 
preSelectedSegment.getEndTimeMs());
+        }
+        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, 
watermarkMs, maxEndTimeMs,
+            bufferMs, bucketMs);
         if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, 
mergeRollupTaskMetadata)) {
           LOGGER.info("Bucket with start: {} and end: {} (table : {}, 
mergeLevel : {}) cannot be merged yet",
               bucketStartMs, bucketEndMs, offlineTableName, mergeLevel);
@@ -339,7 +342,8 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
             watermarkMs, newWatermarkMs);
 
         // Update the delay metrics
-        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, 
newWatermarkMs, bufferMs, bucketMs);
+        createOrUpdateDelayMetrics(offlineTableName, mergeLevel, 
lowerMergeLevel, newWatermarkMs, maxEndTimeMs,
+            bufferMs, bucketMs);
 
         // Create task configs
         int maxNumRecordsPerTask =
@@ -542,11 +546,13 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
     return pinotTaskConfigs;
   }
 
-  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long 
bufferTimeMs, long bucketTimeMs) {
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long 
maxEndTimeMsOfCurrentLevel,
+      long bufferTimeMs, long bucketTimeMs) {
     if (watermarkMs == -1) {
       return 0;
     }
-    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / 
bucketTimeMs;
+    return (Math.min(System.currentTimeMillis() - bufferTimeMs, 
maxEndTimeMsOfCurrentLevel) - watermarkMs)
+        / bucketTimeMs;
   }
 
   /**
@@ -555,12 +561,14 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
    *
    * @param tableNameWithType table name with type
    * @param mergeLevel merge level
+   * @param lowerMergeLevel lower merge level
    * @param watermarkMs current watermark value
+   * @param maxEndTimeMs max end time of all the segments for the table
    * @param bufferTimeMs buffer time
    * @param bucketTimeMs bucket time
    */
-  private void createOrUpdateDelayMetrics(String tableNameWithType, String 
mergeLevel, long watermarkMs,
-      long bufferTimeMs, long bucketTimeMs) {
+  private void createOrUpdateDelayMetrics(String tableNameWithType, String 
mergeLevel, String lowerMergeLevel,
+      long watermarkMs, long maxEndTimeMs, long bufferTimeMs, long 
bucketTimeMs) {
     ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
     if (controllerMetrics == null) {
       return;
@@ -569,16 +577,20 @@ public class MergeRollupTaskGenerator implements 
PinotTaskGenerator {
     // Update gauge value that indicates the delay in terms of the number of 
time buckets.
     Map<String, Long> watermarkForTable =
         _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new 
ConcurrentHashMap<>());
+    _tableMaxEndTimeMs.put(tableNameWithType, maxEndTimeMs);
     watermarkForTable.compute(mergeLevel, (k, v) -> {
       if (v == null) {
         LOGGER.info(
             "Creating the gauge metric for tracking the merge/roll-up task 
delay for table: {} and mergeLevel: {}."
                 + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, 
taskDelayInNumTimeBuckets={})", tableNameWithType,
             mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
-            getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, 
bucketTimeMs));
+            getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, 
lowerMergeLevel == null
+                    ? _tableMaxEndTimeMs.get(tableNameWithType) : 
watermarkForTable.get(lowerMergeLevel),
+                bufferTimeMs, bucketTimeMs));
         
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
 mergeLevel),
-            (() -> 
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), 
bufferTimeMs,
-                bucketTimeMs)));
+            (() -> 
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L),
+                lowerMergeLevel == null ? 
_tableMaxEndTimeMs.get(tableNameWithType)
+                    : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, 
bucketTimeMs)));
       }
       return watermarkMs;
     });

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to