This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 26a83ba4f3 [bugfix] fix mergeRollupTask metrics (#9864) 26a83ba4f3 is described below commit 26a83ba4f302b8b402d38d437f635ff1a4903767 Author: Haitao Zhang <hai...@startree.ai> AuthorDate: Mon Nov 28 21:32:45 2022 -0800 [bugfix] fix mergeRollupTask metrics (#9864) * [bugfix] fix mergeRollupTask metrics * fix a typo * add comments * fix comments --- .../mergerollup/MergeRollupTaskGenerator.java | 43 ++++++++++++---------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java index 28043c2f60..fc7be90657 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java @@ -115,8 +115,8 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { // tableNameWithType -> mergeLevel -> watermarkMs private final Map<String, Map<String, Long>> _mergeRollupWatermarks = new HashMap<>(); - // tableNameWithType -> maxValidBucketEndTime - private final Map<String, Long> _tableMaxValidBucketEndTimeMs = new HashMap<>(); + // tableNameWithType -> lowestLevelMaxValidBucketEndTime + private final Map<String, Long> _tableLowestLevelMaxValidBucketEndTimeMs = new HashMap<>(); @Override public String getTaskType() { @@ -244,16 +244,20 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { getWatermarkMs(preSelectedSegments.get(0).getStartTimeMs(), bucketMs, mergeLevel, mergeRollupTaskMetadata); long bucketStartMs = watermarkMs; long bucketEndMs = bucketStartMs + bucketMs; + if (lowerMergeLevel == null) { + long lowestLevelMaxValidBucketEndTimeMs = Long.MIN_VALUE; + for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) { + // Compute lowestLevelMaxValidBucketEndTimeMs among segments that are ready for merge + long currentValidBucketEndTimeMs = + getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs); + lowestLevelMaxValidBucketEndTimeMs = + Math.max(lowestLevelMaxValidBucketEndTimeMs, currentValidBucketEndTimeMs); + } + _tableLowestLevelMaxValidBucketEndTimeMs.put(offlineTableName, lowestLevelMaxValidBucketEndTimeMs); + } // Create delay metrics even if there's no task scheduled, this helps the case that the controller is restarted // but the metrics are not available until the controller schedules a valid task - long maxValidBucketEndTimeMs = Long.MIN_VALUE; - for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) { - // Compute maxValidBucketEndTimeMs among segments that are ready for merge - long currentValidBucketEndTimeMs = getValidBucketEndTimeMsForSegment(preSelectedSegment, bucketMs, bufferMs); - maxValidBucketEndTimeMs = Math.max(maxValidBucketEndTimeMs, currentValidBucketEndTimeMs); - } - createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, maxValidBucketEndTimeMs, - bufferMs, bucketMs); + createOrUpdateDelayMetrics(offlineTableName, mergeLevel, null, watermarkMs, bufferMs, bucketMs); if (!isValidBucketEndTime(bucketEndMs, bufferMs, lowerMergeLevel, mergeRollupTaskMetadata)) { LOGGER.info("Bucket with start: {} and end: {} (table : {}, mergeLevel : {}) cannot be merged yet", bucketStartMs, bucketEndMs, offlineTableName, mergeLevel); @@ -271,6 +275,9 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { // For each bucket find all segments overlapping with the target bucket, skip the bucket if all overlapping // segments are merged. Schedule k (numParallelBuckets) buckets at most, and stops at the first bucket that // contains spilled over data. + // One may wonder how a segment with records spanning different buckets is handled. The short answer is that + // it will be cut into multiple segments, each for a separate bucket. This is achieved by setting bucket time + // period as PARTITION_BUCKET_TIME_PERIOD when generating PinotTaskConfigs // 2. There's no bucket with unmerged segments, skip scheduling for (SegmentZKMetadata preSelectedSegment : preSelectedSegments) { long startTimeMs = preSelectedSegment.getStartTimeMs(); @@ -338,8 +345,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { watermarkMs, newWatermarkMs); // Update the delay metrics - createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, - maxValidBucketEndTimeMs, bufferMs, bucketMs); + createOrUpdateDelayMetrics(offlineTableName, mergeLevel, lowerMergeLevel, newWatermarkMs, bufferMs, bucketMs); // Create task configs int maxNumRecordsPerTask = @@ -459,7 +465,7 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { // the rounded segment end time of [10/1 00:00, 10/1 23:59] is 10/2 00:00. The rounded segment end time of // [10/1 00:00, 10/2 00:00] is 10/3 00:00 long validBucketEndTimeMs = (segmentZKMetadata.getEndTimeMs() / bucketMs + 1) * bucketMs; - validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs - bucketMs) / bucketMs * bucketMs); + validBucketEndTimeMs = Math.min(validBucketEndTimeMs, (currentTimeMs - bufferMs) / bucketMs * bucketMs); return validBucketEndTimeMs; } @@ -603,12 +609,11 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { * @param mergeLevel merge level * @param lowerMergeLevel lower merge level * @param watermarkMs current watermark value - * @param maxValidBucketEndTimeMs max valid bucket end time of all the segments for the table * @param bufferTimeMs buffer time * @param bucketTimeMs bucket time */ private void createOrUpdateDelayMetrics(String tableNameWithType, String mergeLevel, String lowerMergeLevel, - long watermarkMs, long maxValidBucketEndTimeMs, long bufferTimeMs, long bucketTimeMs) { + long watermarkMs, long bufferTimeMs, long bucketTimeMs) { ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); if (controllerMetrics == null) { return; @@ -617,19 +622,19 @@ public class MergeRollupTaskGenerator extends BaseTaskGenerator { // Update gauge value that indicates the delay in terms of the number of time buckets. Map<String, Long> watermarkForTable = _mergeRollupWatermarks.computeIfAbsent(tableNameWithType, k -> new ConcurrentHashMap<>()); - _tableMaxValidBucketEndTimeMs.put(tableNameWithType, maxValidBucketEndTimeMs); watermarkForTable.compute(mergeLevel, (k, v) -> { if (v == null) { LOGGER.info( "Creating the gauge metric for tracking the merge/roll-up task delay for table: {} and mergeLevel: {}." + "(watermarkMs={}, bufferTimeMs={}, bucketTimeMs={}, taskDelayInNumTimeBuckets={})", tableNameWithType, - mergeLevel, watermarkMs, bucketTimeMs, bucketTimeMs, + mergeLevel, watermarkMs, bufferTimeMs, bucketTimeMs, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, lowerMergeLevel == null - ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel), + ? _tableLowestLevelMaxValidBucketEndTimeMs.get(tableNameWithType) + : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, bucketTimeMs)); controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(tableNameWithType, mergeLevel), (() -> getMergeRollupTaskDelayInNumTimeBuckets(watermarkForTable.getOrDefault(k, -1L), - lowerMergeLevel == null ? _tableMaxValidBucketEndTimeMs.get(tableNameWithType) + lowerMergeLevel == null ? _tableLowestLevelMaxValidBucketEndTimeMs.get(tableNameWithType) : watermarkForTable.get(lowerMergeLevel), bufferTimeMs, bucketTimeMs))); } return watermarkMs; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org