jtao15 commented on a change in pull request #7368:
URL: https://github.com/apache/pinot/pull/7368#discussion_r719788856



##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +491,134 @@ private long getWatermarkMs(long minStartTimeMs, long 
bucketMs, String mergeLeve
 
     return pinotTaskConfigs;
   }
+
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long 
bufferTimeMs, long bucketTimeMs) {
+    if (watermarkMs == -1) {
+      return 0;
+    }
+    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / 
bucketTimeMs;
+  }
+
+  private void setWatermarkMs(String tableNameWithType, String mergeLevel, 
long watermarkMs, long bufferTimeMs,
+      long bucketTimeMs) {
+    LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task 
delay for table: {} and mergeLevel: {}."

Review comment:
       Move this line to `519`? We are creating the metric only when this 
function is called the first time for a give mergeLevel.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +491,134 @@ private long getWatermarkMs(long minStartTimeMs, long 
bucketMs, String mergeLeve
 
     return pinotTaskConfigs;
   }
+
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long 
bufferTimeMs, long bucketTimeMs) {
+    if (watermarkMs == -1) {
+      return 0;
+    }
+    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / 
bucketTimeMs;
+  }
+
+  private void setWatermarkMs(String tableNameWithType, String mergeLevel, 
long watermarkMs, long bufferTimeMs,

Review comment:
       Better naming? This function updates the watermark map for emitting 
metrics purposes, we also set the watermark in zk metadata outside of this 
function. We can either put 
`mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs);` 
inside this function or rename this function.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -90,15 +92,29 @@
  */
 @TaskGenerator
 public class MergeRollupTaskGenerator implements PinotTaskGenerator {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
   private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000;
   private static final String REFRESH = "REFRESH";
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(MergeRollupTaskGenerator.class);
 
+  // This is the metric that keeps track of the task delay in the number of 
time buckets. For example, if we see this

Review comment:
       Thanks for the detailed comments of the metric.

##########
File path: 
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java
##########
@@ -463,4 +492,134 @@ private long getWatermarkMs(long minStartTimeMs, long 
bucketMs, String mergeLeve
 
     return pinotTaskConfigs;
   }
+
+  private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long 
bufferTimeMs, long bucketTimeMs) {
+    if (watermarkMs == -1) {
+      return 0;
+    }
+    return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / 
bucketTimeMs;
+  }
+
+  private void setWatermarkMs(String tableNameWithType, String mergeLevel, 
long watermarkMs, long bufferTimeMs,
+      long bucketTimeMs) {
+    LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task 
delay for table: {} and mergeLevel: {}."
+            + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, 
taskDelayInNumTimeBuckets={})", tableNameWithType,
+        mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs,
+        getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, 
bucketTimeMs));
+
+    ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Update gauge value that indicates the delay in terms of the number of 
time buckets.
+    Map<String, Long> watermarkForTable =
+        _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new 
ConcurrentHashMap<>());
+    watermarkForTable.compute(mergeLevel, (k, v) -> {
+      if (v == null) {
+        
controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType,
 mergeLevel),
+            (() -> 
getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), 
bufferTimeMs,
+                bucketTimeMs)));
+      }
+      return watermarkMs;
+    });
+  }
+
+  /**
+   * Reset the watermark for the given table name
+   * @param tableNameWithType a table name with type
+   */
+  private void resetWatermarkMs(String tableNameWithType) {
+    ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given table name
+    Map<String, Long> watermarksForTable = 
_mergeRollupWatermarks.remove(tableNameWithType);
+    if (watermarksForTable != null) {
+      for (String mergeLevel : watermarksForTable.keySet()) {
+        
controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, 
mergeLevel));
+      }
+    }
+  }
+
+  /**
+   * Reset the watermerk for the given table name and merge level
+   *
+   * @param tableNameWithType table name with type
+   * @param mergeLevel merge level
+   */
+  private void resetWatermarkMs(String tableNameWithType, String mergeLevel) {
+    ControllerMetrics controllerMetrics = 
_clusterInfoAccessor.getControllerMetrics();
+    if (controllerMetrics == null) {
+      return;
+    }
+
+    // Delete all the watermarks associated with the given table name

Review comment:
       (nit) // Delete the watermark associated with the merge level of given 
table name




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to