snleee commented on a change in pull request #7368: URL: https://github.com/apache/pinot/pull/7368#discussion_r717991630
########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -339,6 +405,18 @@ public String getTaskType() { .info("Finished generating task configs for table: {} for task: {}, numTasks: {}", offlineTableName, taskType, pinotTaskConfigsForTable.size()); } + + // Reset watermarks for invalid tables Review comment: I was thinking of the case where we added the merge config and then removed. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +541,57 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String offlineTableName, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting wartermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + offlineTableName, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + + // Update gauge value that indicates the delay in terms of the number of time buckets. + MetricKey metricKey = new MetricKey(offlineTableName, mergeLevel); + _mergeRollupWatermarks.compute(metricKey, (k, v) -> { + if (v == null) { + controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(metricKey.toString()), + (() -> getMergeRollupTaskDelayInNumTimeBuckets(_mergeRollupWatermarks.getOrDefault(k, 0L), bufferTimeMs, + bucketTimeMs))); + } + return watermarkMs; + }); + } + + private void resetWatermarkMs(String tableNameWithType) { + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + Iterator<MetricKey> it = _mergeRollupWatermarks.keySet().iterator(); + while (it.hasNext()) { + MetricKey metricKey = it.next(); + if (metricKey.getTableNameWithType().equals(tableNameWithType)) { + it.remove(); + controllerMetrics.removeCallbackGauge(getMetricNameForTaskDelay(metricKey.toString())); Review comment: There can be multiple `MetricKey` with the same `tableName`. We will have 1 entry per `<tableName, mergeLevel>` pair. So, we will need to traverse the entire map... ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +541,57 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String offlineTableName, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting wartermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + offlineTableName, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + + // Update gauge value that indicates the delay in terms of the number of time buckets. + MetricKey metricKey = new MetricKey(offlineTableName, mergeLevel); + _mergeRollupWatermarks.compute(metricKey, (k, v) -> { + if (v == null) { + controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(metricKey.toString()), + (() -> getMergeRollupTaskDelayInNumTimeBuckets(_mergeRollupWatermarks.getOrDefault(k, 0L), bufferTimeMs, + bucketTimeMs))); + } + return watermarkMs; + }); + } + + private void resetWatermarkMs(String tableNameWithType) { + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + Iterator<MetricKey> it = _mergeRollupWatermarks.keySet().iterator(); + while (it.hasNext()) { + MetricKey metricKey = it.next(); + if (metricKey.getTableNameWithType().equals(tableNameWithType)) { + it.remove(); + controllerMetrics.removeCallbackGauge(getMetricNameForTaskDelay(metricKey.toString())); Review comment: There can be multiple `MetricKey`s with the same `tableName`. We will have 1 entry per `<tableName, mergeLevel>` pair. So, we will need to traverse the entire map... ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -92,13 +97,69 @@ public class MergeRollupTaskGenerator implements PinotTaskGenerator { private static final int DEFAULT_MAX_NUM_RECORDS_PER_TASK = 50_000_000; private static final String REFRESH = "REFRESH"; + private static final String MERGE_ROLLUP_TASK_DELAY_IN_NUM_BUCKETS = "mergeRollupTaskDelayInNumBuckets"; private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskGenerator.class); - Review comment: fixed ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java ########## @@ -306,6 +306,8 @@ public void testSingleLevelConcat() // Check total tasks assertEquals(numTasks, 5); + Assert.assertTrue(_controllerStarter.getControllerMetrics().containsGauge("myTable1_OFFLINE.100days")); Review comment: I changed the test to make it consistent. I removed `Assert.`. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -277,6 +279,10 @@ public String getTaskType() { } Long prevWatermarkMs = mergeRollupTaskMetadata.getWatermarkMap().put(mergeLevel, windowStartMs); + if (prevWatermarkMs != null) { Review comment: @jtao15 @Jackie-Jiang Updated. Can you guys take a look again? I removed this check. `watermarkMs` cannot be null so we can just call `setWatermarkMs` without the check. ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -110,17 +171,23 @@ public String getTaskType() { public List<PinotTaskConfig> generateTasks(List<TableConfig> tableConfigs) { String taskType = MergeRollupTask.TASK_TYPE; List<PinotTaskConfig> pinotTaskConfigs = new ArrayList<>(); - + Set<String> candidateMergeTables = new HashSet<>(); for (TableConfig tableConfig : tableConfigs) { if (!validate(tableConfig, taskType)) { continue; } String offlineTableName = tableConfig.getTableName(); LOGGER.info("Start generating task configs for table: {} for task: {}", offlineTableName, taskType); + candidateMergeTables.add(offlineTableName); // Get all segment metadata List<SegmentZKMetadata> allSegments = _clusterInfoAccessor.getSegmentsZKMetadata(offlineTableName); + // Reset the watermark time if no segment found + if (allSegments.isEmpty()) { Review comment: As we discussed, we still want to keep this because we want to cover the case when we deleted the segments for the existing table. I added more comments. ########## File path: pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MergeRollupMinionClusterIntegrationTest.java ########## @@ -259,7 +260,7 @@ public void testSingleLevelConcat() // {merged_100days_T4_1_16400_16404_1, myTable1_16405_16435_2} // -> {merged_100days_T5_0_myTable1_16400_16435_0} - String sqlQuery = "SELECT count(*) FROM mytable1"; // 115545 rows for the test table + String sqlQuery = "SELECT count(*) FROM myTable1"; // 115545 rows for the test table Review comment: They probably were checking the equality for the empty response..? ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +541,57 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String offlineTableName, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting wartermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + offlineTableName, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + + // Update gauge value that indicates the delay in terms of the number of time buckets. + MetricKey metricKey = new MetricKey(offlineTableName, mergeLevel); + _mergeRollupWatermarks.compute(metricKey, (k, v) -> { + if (v == null) { + controllerMetrics.addCallbackGaugeIfNeeded(getMetricNameForTaskDelay(metricKey.toString()), + (() -> getMergeRollupTaskDelayInNumTimeBuckets(_mergeRollupWatermarks.getOrDefault(k, 0L), bufferTimeMs, + bucketTimeMs))); + } + return watermarkMs; + }); + } + + private void resetWatermarkMs(String tableNameWithType) { + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { + return; + } + Iterator<MetricKey> it = _mergeRollupWatermarks.keySet().iterator(); + while (it.hasNext()) { + MetricKey metricKey = it.next(); + if (metricKey.getTableNameWithType().equals(tableNameWithType)) { + it.remove(); + controllerMetrics.removeCallbackGauge(getMetricNameForTaskDelay(metricKey.toString())); Review comment: @jackjlli I changed the watermark map as 2 level map (tableName -> mergeLevel -> watermark). Can you go over one more time? ########## File path: pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java ########## @@ -463,4 +490,71 @@ private long getWatermarkMs(long minStartTimeMs, long bucketMs, String mergeLeve return pinotTaskConfigs; } + + private long getMergeRollupTaskDelayInNumTimeBuckets(long watermarkMs, long bufferTimeMs, long bucketTimeMs) { + if (bufferTimeMs == 0 || watermarkMs == -1) { + return 0; + } + return (long) Math.floor((System.currentTimeMillis() - watermarkMs - bufferTimeMs) / (double) bucketTimeMs); + } + + private void setWatermarkMs(String tableNameWithType, String mergeLevel, long watermarkMs, long bufferTimeMs, + long bucketTimeMs) { + LOGGER.info( + "Setting watermark for table: {} and mergeLevel: {} is {} (watermarkMs={}, bufferTimeMs={}, bucketTimeMs={})", + tableNameWithType, mergeLevel, getMergeRollupTaskDelayInNumTimeBuckets(watermarkMs, bufferTimeMs, bucketTimeMs), + watermarkMs, bucketTimeMs, bucketTimeMs); + + ControllerMetrics controllerMetrics = _clusterInfoAccessor.getControllerMetrics(); + if (controllerMetrics == null) { + return; + } + + PinotMetricsRegistry metricsRegistry = controllerMetrics.getMetricsRegistry(); + if (metricsRegistry == null) { Review comment: This is needed for testing. I saw some of the test fails due to null metricsRegistry. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org