This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 47e49ec Use valid bucket end time instead of segment end time for merge/rollup delay metrics. This handles the corner case that the metric take the segments which are not ready to merge into consideration. (#7827) 47e49ec is described below commit 47e49ecd6e11aebe74f8868cdac22051b175d4c5 Author: Jiapeng Tao <jia...@linkedin.com> AuthorDate: Wed Dec 1 18:27:06 2021 -0800 Use valid bucket end time instead of segment end time for merge/rollup delay metrics. This handles the corner case that the metric take the segments which are not ready to merge into consideration. (#7827) --- .../mergerollup/MergeRollupTaskGenerator.java | 55 +++++++++++++++------- 1 file changed, 37 insertions(+), 18 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index ece07be..1f81011 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -114,15 +114,15 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { // tableNameWithType -> mergeLevel -> watermarkMs private Map<String, Map<String, Long>> _mergeRollupWatermarks; - // tableNameWithType -> maxEndTime - private Map<String, Long> _tableMaxEndTimeMs; + // tableNameWithType -> maxValidBucketEndTime + private Map<String, Long> _tableMaxValidBucketEndTimeMs; private ClusterInfoAccessor _clusterInfoAccessor; @Override public void init(ClusterInfoAccessor clusterInfoAccessor) { _clusterInfoAccessor = clusterInfoAccessor; _mergeRollupWatermarks = new HashMap<>(); - _tableMaxEndTimeMs = new HashMap<>(); + _tableMaxValidBucketEndTimeMs = new HashMap<>(); } @Override @@ -252,15 +252,13 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { long bucketEndMs = bucketStartMs + bucketMs; // Create delay metrics even if there's no task scheduled, this helps the case that the controller is restarted // but the metrics are not available until the controller schedules a valid task - long maxEndTimeMs = Long.MIN_VALUE; + long maxValidBucketEndTimeMs = Long.MIN_VALUE; for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) { - long currentEndTimeMs = preSelectedSegment.getEndTimeMs(); - // Compute maxEndTimeMs among segments that are valid for merge - if (currentEndTimeMs < System.currentTimeMillis() - bufferMs) { - maxEndTimeMs = Math.max(maxEndTimeMs, currentEndTimeMs); - } + // Compute maxValidBucketEndTimeMs among segments that are ready for merge + long currentValidBucketEndTimeMs = getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs); + maxValidBucketEndTimeMs = Math.max(maxValidBucketEndTimeMs, currentValidBucketEndTimeMs); } - createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, maxEndTimeMs, + createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, maxValidBucketEndTimeMs, bufferMs, bucketMs); if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) { LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet", @@ -346,8 +344,8 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { watermarkMs, newWatermarkMs); // Update the delay metrics - createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, maxEndTimeMs, - bufferMs, bucketMs); + createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, + maxValidBucketEndTimeMs, bufferMs, bucketMs); // Create task configs int maxNumRecordsPerTask = @@ -440,6 +438,28 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { } /** + * Get the valid bucket end time before the buffer (now - bufferMs). Consider the segment as multiple contiguous + * time buckets, this function will return the last bucket end time before the buffer. Return LONG.MIN_VALUE if + * there's no valid bucket before the buffer. + */ + private long getValidBucketEndTimeMsForSegment(SegmentZKMetadata segmentZKMetadata, long bucketMs, long bufferMs) { + // Make sure the segment is ready for merge (the first bucket <= now - bufferTime) + long currentTimeMs = System.currentTimeMillis(); + long firstBucketEndTimeMs = segmentZKMetadata.getStartTimeMs() / bucketMs * bucketMs + bucketMs; + if (firstBucketEndTimeMs > currentTimeMs - bufferMs) { + return Long.MIN_VALUE; + } + // The validBucketEndTime is calculated as the min(segment end time, now - bufferTime) rounded to the bucket + // boundary. + // Notice bucketEndTime is exclusive while segment end time is inclusive. E.g. if bucketTime = 1d, + // the rounded segment end time of [10/1 00:00, 10/1 23:59] is 10/2 00:00. The rounded segment end time of + // [10/1 00:00, 10/2 00:00] is 10/3 00:00 + long validBucketEndTimeMs = (segmentZKMetadata.getEndTimeMs() / bucketMs + 1) * bucketMs; + validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs - bucketMs) / bucketMs * bucketMs); + return validBucketEndTimeMs; + } + + /** * Check if the segment span multiple buckets */ private boolean hasSpilledOverData(SegmentZKMetadata segmentZKMetadata, long bucketMs) { @@ -562,17 +582,16 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { /** * Update the delay metrics for the given table and merge level. We create the new gauge metric if the metric is not * available. - * * @param tableNameWithType table name with type * @param mergeLevel merge level * @param lowerMergeLevel lower merge level * @param watermarkMs current watermark value - * @param maxEndTimeMs max end time of all the segments for the table + * @param maxValidBucketEndTimeMs max valid bucket end time of all the segments for the table * @param bufferTimeMs buffer time * @param bucketTimeMs bucket time */ private void createOrUpdateDelayMetrics(String tableNameWithType, String mergeLevel, String lowerMergeLevel, - long watermarkMs, long maxEndTimeMs, long bufferTimeMs, long bucketTimeMs) { + long watermarkMs, long maxValidBucketEndTimeMs, long bufferTimeMs, long bucketTimeMs) { ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); if (controllerMetrics == null) { return; @@ -581,7 +600,7 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { // Update gauge value that indicates the delay in terms of the number of time buckets. Map<String, Long> watermarkForTable = _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>()); - _tableMaxEndTimeMs.put(tableNameWithType, maxEndTimeMs); + _tableMaxValidBucketEndTimeMs.put(tableNameWithType, maxValidBucketEndTimeMs); watermarkForTable.compute(mergeLevel, (k, v) -> { if (v == null) { LOGGER.info( @@ -589,11 +608,11 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType, mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, lowerMergeLevel == null - ? _tableMaxEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel), + ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, bucketTimeMs)); controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel), (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), - lowerMergeLevel == null ? _tableMaxEndTimeMs.get(tableNameWithType) + lowerMergeLevel == null ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, bucketTimeMs))); } return watermarkMs; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org