jtao15 commented on a change in pull request #7368: URL: https://github.com/apache/pinot/pull/7368#discussion_r719788856
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +491,134 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (watermarkMs == -1) { + return 0; + } + return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs; + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}." Review comment: Move this line to `519`? We are creating the metric only when this function is called the first time for a give mergeLevel. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +491,134 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (watermarkMs == -1) { + return 0; + } + return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs; + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, Review comment: Better naming? This function updates the watermark map for emitting metrics purposes, we also set the watermark in zk metadata outside of this function. We can either put `mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs);` inside this function or rename this function. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -90,15 +92,29 @@ */ @TaskGenerator public class MergeRollupTaskGenerator implements PinotTaskGenerator { + private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class); private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000; private static final String REFRESH = "REFRESH"; - private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class); + // This is the metric that keeps track of the task delay in the number of time buckets. For example, if we see this Review comment: Thanks for the detailed comments of the metric. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +492,134 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (watermarkMs == -1) { + return 0; + } + return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs; + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info("Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}." + + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType, + mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs, + getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs)); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + // Update gauge value that indicates the delay in terms of the number of time buckets. + Map<String, Long> watermarkForTable = + _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>()); + watermarkForTable.compute(mergeLevel, (k, v) -> { + if (v == null) { + controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel), + (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs, + bucketTimeMs))); + } + return watermarkMs; + }); + } + + /** + * Reset the watermark for the given table name + * @param tableNameWithType a table name with type + */ + private void resetWatermarkMs(String tableNameWithType) { + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + // Delete all the watermarks associated with the given table name + Map<String, Long> watermarksForTable = _mergeRollupWatermarks.remove(tableNameWithType); + if (watermarksForTable != null) { + for (String mergeLevel : watermarksForTable.keySet()) { + controllerMetrics.removeGauge(getMetricNameForTaskDelay(tableNameWithType, mergeLevel)); + } + } + } + + /** + * Reset the watermerk for the given table name and merge level + * + * @param tableNameWithType table name with type + * @param mergeLevel merge level + */ + private void resetWatermarkMs(String tableNameWithType, String mergeLevel) { + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + // Delete all the watermarks associated with the given table name Review comment: (nit) // Delete the watermark associated with the merge level of given table name -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org