This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7bcbda1 Use maxEndTimeMs for merge/roll-up delay metrics. (#7617) 7bcbda1 is described below commit 7bcbda1680922b97aa1ccaecd3b5eeaf75a87a02 Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Fri Oct 22 20:07:09 2021 -0700 Use maxEndTimeMs for merge/roll-up delay metrics. (#7617) --- .../mergerollup/MergeRollupTaskGenerator.java | 40 ++++++++++++++-------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index e81ee4e..3b59a6f 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -110,21 +110,19 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { // number to be 7 and merge task is configured with "bucketTimePeriod = 1d", this means that we have 7 days of // delay. When operating merge/roll-up task in production, we should set the alert on this metrics to find out the // delay. Setting the alert on 7 time buckets of delay would be a good starting point. - // - // NOTE: Based on the current scheduler logic, we are bumping up the watermark with some delay. (the current round - // will bump up the watermark for the window that got processed from the previous round). Due to this, we will - // correctly report the delay with one edge case. When we processed all available time windows, the watermark - // will not get bumped up until we schedule some task for the table. Due to this, we will always see the delay >= 1. private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets"; // tableNameWithType -> mergeLevel -> watermarkMs private Map<String, Map<String, Long>> _mergeRollupWatermarks; + // tableNameWithType -> maxEndTime + private Map<String, Long> _tableMaxEndTimeMs; private ClusterInfoAccessor _clusterInfoAccessor; @Override public void init(ClusterInfoAccessor clusterInfoAccessor) { _clusterInfoAccessor = clusterInfoAccessor; _mergeRollupWatermarks = new HashMap<>(); + _tableMaxEndTimeMs = new HashMap<>(); } @Override @@ -254,7 +252,12 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { long bucketEndMs = bucketStartMs + bucketMs; // Create delay metrics even if there's no task scheduled, this helps the case that the controller is restarted // but the metrics are not available until the controller schedules a valid task - createOrUpdateDelayMetrics(offlineTableName, mergeLevel, watermarkMs, bufferMs, bucketMs); + long maxEndTimeMs = Long.MIN_VALUE; + for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) { + maxEndTimeMs = Math.max(maxEndTimeMs, preSelectedSegment.getEndTimeMs()); + } + createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, maxEndTimeMs, + bufferMs, bucketMs); if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) { LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet", bucketStartMs, bucketEndMs, offlineTableName, mergeLevel); @@ -339,7 +342,8 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { watermarkMs, newWatermarkMs); // Update the delay metrics - createOrUpdateDelayMetrics(offlineTableName, mergeLevel, newWatermarkMs, bufferMs, bucketMs); + createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, maxEndTimeMs, + bufferMs, bucketMs); // Create task configs int maxNumRecordsPerTask = @@ -542,11 +546,13 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { return pinotTaskConfigs; } - private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long maxEndTimeMsOfCurrentLevel, + long bufferTimeMs, long bucketTimeMs) { if (watermarkMs == -1) { return 0; } - return (System.currentTimeMillis() - watermarkMs - bufferTimeMs) / bucketTimeMs; + return (Math.min(System.currentTimeMillis() - bufferTimeMs, maxEndTimeMsOfCurrentLevel) - watermarkMs) + / bucketTimeMs; } /** @@ -555,12 +561,14 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { * * @param tableNameWithType table name with type * @param mergeLevel merge level + * @param lowerMergeLevel lower merge level * @param watermarkMs current watermark value + * @param maxEndTimeMs max end time of all the segments for the table * @param bufferTimeMs buffer time * @param bucketTimeMs bucket time */ - private void createOrUpdateDelayMetrics(String tableNameWithType, String mergeLevel, long watermarkMs, - long bufferTimeMs, long bucketTimeMs) { + private void createOrUpdateDelayMetrics(String tableNameWithType, String mergeLevel, String lowerMergeLevel, + long watermarkMs, long maxEndTimeMs, long bufferTimeMs, long bucketTimeMs) { ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); if (controllerMetrics == null) { return; @@ -569,16 +577,20 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { // Update gauge value that indicates the delay in terms of the number of time buckets. Map<String, Long> watermarkForTable = _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>()); + _tableMaxEndTimeMs.put(tableNameWithType, maxEndTimeMs); watermarkForTable.compute(mergeLevel, (k, v) -> { if (v == null) { LOGGER.info( "Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}." + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType, mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs, - getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs)); + getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, lowerMergeLevel == null + ? _tableMaxEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel), + bufferTimeMs, bucketTimeMs)); controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel), - (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), bufferTimeMs, - bucketTimeMs))); + (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), + lowerMergeLevel == null ? _tableMaxEndTimeMs.get(tableNameWithType) + : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, bucketTimeMs))); } return watermarkMs; }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org