Jackie-Jiang commented on a change in pull request #7368: URL: https://github.com/apache/pinot/pull/7368#discussion_r718876304
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); Review comment: This is equivalent to: ```suggestion return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs; ``` ########## File path: pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java ########## @@ -504,7 +504,7 @@ private void removeGauge(final String gaugeName) { * Remove callback gauge. * @param metricName metric name */ - private void removeCallbackGauge(String metricName) { + public void removeCallbackGauge(String metricName) { Review comment: Should we expose `removeGauge()` instead of this one? ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -339,6 +351,21 @@ public String getTaskType() { .info("Finished generating task configs for table: {} for task: {}, numTasks: {}", offlineTableName, taskType, pinotTaskConfigsForTable.size()); } + + // Reset watermarks for invalid tables. This covers the metrics clean up when the table is removed or the merge + // config is added and then removed. + LeadControllerManager leadControllerManager = _clusterInfoAccessor.getLeaderControllerManager(); + List<String> cleanUpCandidates = new ArrayList<>(); + for (String tableNameWithType : _mergeRollupWatermarks.keySet()) { + if (!candidateMergeTables.contains(tableNameWithType) || !leadControllerManager Review comment: (Critical) This will remove all the metrics when someone triggers an adhoc task which only generate task for one table ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting watermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + tableNameWithType, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + + // Update gauge value that indicates the delay in terms of the number of time buckets. + _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new HashMap<>()); + Map<String, Long> watermarkForTable = _mergeRollupWatermarks.get(tableNameWithType); Review comment: The `watermarkForTable` need to be concurrent map because it can be read and write at the same time ```suggestion Map<String, Long> watermarkForTable = _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>()); ``` ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting watermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + tableNameWithType, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + + // Update gauge value that indicates the delay in terms of the number of time buckets. + _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new HashMap<>()); + Map<String, Long> watermarkForTable = _mergeRollupWatermarks.get(tableNameWithType); + + watermarkForTable.compute(mergeLevel, (k, v) -> { + if (v == null) { + controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel), + (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs, + bucketTimeMs))); + } + return watermarkMs; + }); + } + + /** + * Reset the watermark for the given table name + * @param tableNameWithType a table name with type + */ + private void resetWatermarkMs(String tableNameWithType) { + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { Review comment: Do we need to check this when `ControllerMetrics` is already present? ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting watermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", Review comment: Suggest revising this sentence. We log the delay instead of the watermark ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -277,6 +288,7 @@ public String getTaskType() { } Long prevWatermarkMs = mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs); + setWatermarkMs(offlineTableName, mergeLevel, waterMarkMs, bufferMs, bucketMs); Review comment: The third argument should be `windowStartMs` instead of `watermarkMs`. `watermarkMs` is the from the previous run, which might be lower than the `windowStartMs` ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting watermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + tableNameWithType, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { Review comment: Do we need this check? ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -92,13 +96,17 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000; private static final String REFRESH = "REFRESH"; + private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets"; private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class); + // tableNameWithType -> mergeLevel -> watermarkMs + private Map<String, Map<String, Long>> _mergeRollupWatermarks; private ClusterInfoAccessor _clusterInfoAccessor; @Override public void init(ClusterInfoAccessor clusterInfoAccessor) { _clusterInfoAccessor = clusterInfoAccessor; + _mergeRollupWatermarks = new ConcurrentHashMap<>(); Review comment: We don't need a concurrent map because all the task generators are single-threaded ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { Review comment: Why do we want to skip when buffer time is 0? ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting watermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + tableNameWithType, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + + // Update gauge value that indicates the delay in terms of the number of time buckets. + _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new HashMap<>()); + Map<String, Long> watermarkForTable = _mergeRollupWatermarks.get(tableNameWithType); + + watermarkForTable.compute(mergeLevel, (k, v) -> { + if (v == null) { + controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel), + (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs, + bucketTimeMs))); + } + return watermarkMs; + }); + } + + /** + * Reset the watermark for the given table name + * @param tableNameWithType a table name with type + */ + private void resetWatermarkMs(String tableNameWithType) { + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + + // Delete all the watermarks associated with the given table name + Map<String, Long> watermarksForTable = _mergeRollupWatermarks.get(tableNameWithType); Review comment: (nit) Directly call `_mergeRollupWatermarks.remove(tableNameWithType);` ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); Review comment: The watermark here is the window start time. Not sure if you want to count the window itself. The current logic will always have delay >= 1 ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -94,6 +96,9 @@ private static final String REFRESH = "REFRESH"; private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class); + // Metrics + private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets"; Review comment: Please add some comments explain the meaning of this metric -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org