This is an automated email from the ASF dual-hosted git repository. somandal pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4ee441e18d Metrics that Tracks the Progress of each Table Rebalance Job (#15518) 4ee441e18d is described below commit 4ee441e18db02d94fcc5d4a2affddaf20d825290 Author: Jhow <44998515+j-howhu...@users.noreply.github.com> AuthorDate: Fri Apr 25 14:49:20 2025 -0700 Metrics that Tracks the Progress of each Table Rebalance Job (#15518) --- .../configs/controller.yml | 9 +++++ .../pinot/common/metrics/ControllerGauge.java | 6 +++- .../ControllerPrometheusMetricsTest.java | 5 +++ .../prometheus/PinotPrometheusMetricsTest.java | 6 ++++ .../rebalance/ZkBasedTableRebalanceObserver.java | 40 ++++++++++++++++++++++ .../TestZkBasedTableRebalanceObserver.java | 27 +++++++++++++-- 6 files changed, 89 insertions(+), 4 deletions(-) diff --git a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml index 4b06ad572e..5023cc639d 100644 --- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml +++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml @@ -48,6 +48,15 @@ rules: table: "$2$4" tableType: "$5" taskType: "$6" +# Gauge for rebalanceJobId and tableNameWithType +- pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", name=\"pinot\\.controller\\.tableRebalanceJobProgressPercent\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.([0-9a-fA-F-]+)\"><>(\\w+)" + name: "pinot_controller_tableRebalanceJobProgressPercent_$6" + cache: true + labels: + database: "$2" + table: "$1$3" + tableType: "$4" + jobId: "$5" # Special handling for timers like cronSchedulerJobExecutionTimeMs and tableRebalanceExecutionTimeMs which use table name, table type and another string for status / taskType - pattern: "\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", name=\"pinot\\.controller\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.(\\w+)\\.cronSchedulerJobExecutionTimeMs\"><>(\\w+)" name: "pinot_controller_cronSchedulerJobExecutionTimeMs_$6" diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java index 51ff09387c..ddbb67a049 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java @@ -216,7 +216,11 @@ public enum ControllerGauge implements AbstractMetrics.Gauge { // Bytes to be written to deep store DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true), // Count of deep store segment writes that are currently in progress - DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true); + DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true), + + // The progress of a certain table rebalance job of a table + TABLE_REBALANCE_JOB_PROGRESS_PERCENT("percent", false); + private final String _gaugeName; private final String _unit; diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java index 45583184d4..0cf11bc020 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java @@ -178,6 +178,11 @@ public abstract class ControllerPrometheusMetricsTest extends PinotPrometheusMet String.format("%s.%s", ExportedLabelValues.CONTROLLER_PERIODIC_TASK_CHC, TaskState.IN_PROGRESS)); assertGaugeExportedCorrectly(ControllerGauge.TASK_STATUS.getGaugeName(), ExportedLabels.JOBSTATUS_CONTROLLER_TASKTYPE, EXPORTED_METRIC_PREFIX); + } else if (controllerGauge == ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT) { + addGaugeWithLabels(controllerGauge, + String.format("%s.%s", TABLE_NAME_WITH_TYPE, REBALANCE_JOB_ID)); + assertGaugeExportedCorrectly(controllerGauge.getGaugeName(), + ExportedLabels.JOBID_TABLENAME_TABLETYPE, EXPORTED_METRIC_PREFIX); } else { addGaugeWithLabels(controllerGauge, TABLE_NAME_WITH_TYPE); assertGaugeExportedCorrectly(controllerGauge.getGaugeName(), ExportedLabels.TABLENAME_TABLETYPE, diff --git a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java index 5173bc00df..f7d0d7462e 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java @@ -37,6 +37,7 @@ import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -68,6 +69,7 @@ public abstract class PinotPrometheusMetricsTest { protected static final String PARTITION_GROUP_ID = "partitionGroupId"; protected static final String CLIENT_ID = String.format("%s-%s-%s", TABLE_NAME_WITH_TYPE, KAFKA_TOPIC, PARTITION_GROUP_ID); + protected static final String REBALANCE_JOB_ID = UUID.randomUUID().toString(); protected HttpClient _httpClient; @@ -336,6 +338,9 @@ public abstract class PinotPrometheusMetricsTest { public static final List<String> TASKTYPE_TABLENAME_TABLETYPE = List.of(TASKTYPE, ExportedLabelValues.MINION_TASK_SEGMENT_IMPORT, TABLE, ExportedLabelValues.TABLENAME, TABLETYPE, TABLETYPE_REALTIME); + + public static final List<String> JOBID_TABLENAME_TABLETYPE = + List.of(JOBID, REBALANCE_JOB_ID, TABLE, ExportedLabelValues.TABLENAME, TABLETYPE, TABLETYPE_REALTIME); } public static class ExportedLabelKeys { @@ -348,6 +353,7 @@ public abstract class PinotPrometheusMetricsTest { public static final String PERIODIC_TASK = "periodicTask"; public static final String STATUS = "status"; public static final String DATABASE = "database"; + public static final String JOBID = "jobId"; } public static class ExportedLabelValues { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java index 125b0568bc..6b12a9a225 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java @@ -84,6 +84,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { switch (trigger) { case START_TRIGGER: updateOnStart(currentState, targetState, rebalanceContext); + emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall()); trackStatsInZk(); updatedStatsInZk = true; break; @@ -104,6 +105,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { } if (!_tableRebalanceProgressStats.getRebalanceProgressStatsOverall().equals(latestProgress)) { _tableRebalanceProgressStats.setRebalanceProgressStatsOverall(latestProgress); + emitProgressMetric(latestProgress); } trackStatsInZk(); updatedStatsInZk = true; @@ -129,6 +131,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { if (!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress)) { _tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(latestProgress); } + emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall()); trackStatsInZk(); updatedStatsInZk = true; } @@ -199,6 +202,7 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { new TableRebalanceProgressStats.RebalanceProgressStats(); _tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(progressStats); trackStatsInZk(); + emitProgressMetricDone(); } @Override @@ -234,6 +238,37 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { return _numUpdatesToZk; } + /** + * Emits the rebalance progress in percent to the metrics. Uses the percentage of remaining segments to be added as + * the indicator of the overall progress. + * Notice that for some jobs, the metrics may not be exactly accurate and would not be 100% when the job is done. + * (e.g. when `lowDiskMode=false`, the job finishes without waiting for `totalRemainingSegmentsToBeDeleted` become + * 0, or when `bestEffort=true` the job finishes without waiting for both `totalRemainingSegmentsToBeAdded`, + * `totalRemainingSegmentsToBeDeleted`, and `totalRemainingSegmentsToConverge` become 0) + * Therefore `emitProgressMetricDone()` should be called to emit the final progress as the time job exits. + * @param overallProgress the latest overall progress + */ + private void emitProgressMetric(TableRebalanceProgressStats.RebalanceProgressStats overallProgress) { + // Round this up so the metric is 100 only when no segment remains + long progressPercent = 100 - (long) Math.ceil(TableRebalanceProgressStats.calculatePercentageChange( + overallProgress._totalSegmentsToBeAdded + overallProgress._totalSegmentsToBeDeleted, + overallProgress._totalRemainingSegmentsToBeAdded + overallProgress._totalRemainingSegmentsToBeDeleted + + overallProgress._totalRemainingSegmentsToConverge)); + // Using the original job ID to group rebalance retries together with the same label + _controllerMetrics.setValueOfTableGauge(_tableNameWithType + "." + _tableRebalanceContext.getOriginalJobId(), + ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT, + progressPercent < 0 ? 0 : progressPercent); + } + + /** + * Emits the rebalance progress as 100 (%) to the metrics. This is to ensure that the progress is at least aligned + * when the job done to avoid confusion + */ + private void emitProgressMetricDone() { + _controllerMetrics.setValueOfTableGauge(_tableNameWithType + "." + _tableRebalanceContext.getOriginalJobId(), + ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT, 100); + } + @VisibleForTesting TableRebalanceContext getTableRebalanceContext() { return _tableRebalanceContext; @@ -297,6 +332,11 @@ public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver { return jobMetadata; } + @VisibleForTesting + TableRebalanceProgressStats getTableRebalanceProgressStats() { + return _tableRebalanceProgressStats; + } + /** * Takes in targetState and sourceState and computes stats based on the comparison between sourceState and * targetState.This captures how far the source state is from the target state. Example - if there are 4 segments and diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java index 0d2f395d79..e80530c082 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java @@ -24,10 +24,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; -import org.mockito.Mockito; import org.testng.annotations.Test; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING; @@ -38,6 +38,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -49,11 +50,12 @@ public class TestZkBasedTableRebalanceObserver { PinotHelixResourceManager pinotHelixResourceManager = mock(PinotHelixResourceManager.class); // Mocking this. We will verify using numZkUpdate stat when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), any())).thenReturn(true); - ControllerMetrics controllerMetrics = Mockito.mock(ControllerMetrics.class); + ControllerMetrics controllerMetrics = ControllerMetrics.get(); TableRebalanceContext retryCtx = new TableRebalanceContext(); retryCtx.setConfig(new RebalanceConfig()); + retryCtx.setOriginalJobId("testZkObserverTracking"); ZkBasedTableRebalanceObserver observer = - new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, pinotHelixResourceManager); + new ZkBasedTableRebalanceObserver("dummy", "testZkObserverTracking", retryCtx, pinotHelixResourceManager); Map<String, Map<String, String>> source = new TreeMap<>(); Map<String, Map<String, String>> target = new TreeMap<>(); target.put("segment1", @@ -67,16 +69,21 @@ public class TestZkBasedTableRebalanceObserver { segmentSet, segmentSet); observer.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, source, target, rebalanceContext); assertEquals(observer.getNumUpdatesToZk(), 1); + checkProgressPercentMetrics(controllerMetrics, observer); observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, source, source, rebalanceContext); + checkProgressPercentMetrics(controllerMetrics, observer); observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, source, source, rebalanceContext); + checkProgressPercentMetrics(controllerMetrics, observer); // START_TRIGGER will set up the ZK progress stats to have the diff between source and target. When calling the // triggers for IS and EV-IS, since source and source are compared, the diff will change for the IS trigger // but not for the EV-IS trigger, so ZK must be updated 1 extra time assertEquals(observer.getNumUpdatesToZk(), 2); observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, source, target, rebalanceContext); + checkProgressPercentMetrics(controllerMetrics, observer); observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, source, target, rebalanceContext); + checkProgressPercentMetrics(controllerMetrics, observer); // Both of the changes above will update ZK for progress stats assertEquals(observer.getNumUpdatesToZk(), 4); // Try a rollback and this should trigger a ZK update as well @@ -84,6 +91,20 @@ public class TestZkBasedTableRebalanceObserver { assertEquals(observer.getNumUpdatesToZk(), 5); } + private void checkProgressPercentMetrics(ControllerMetrics controllerMetrics, + ZkBasedTableRebalanceObserver observer) { + Long progressGaugeValue = controllerMetrics.getGaugeValue( + ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT.getGaugeName() + ".dummy.testZkObserverTracking"); + assertNotNull(progressGaugeValue); + TableRebalanceProgressStats.RebalanceProgressStats overallProgress = + observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall(); + long progressRemained = (long) Math.ceil(TableRebalanceProgressStats.calculatePercentageChange( + overallProgress._totalSegmentsToBeAdded + overallProgress._totalSegmentsToBeDeleted, + overallProgress._totalRemainingSegmentsToBeAdded + overallProgress._totalRemainingSegmentsToBeDeleted + + overallProgress._totalRemainingSegmentsToConverge)); + assertEquals(progressGaugeValue, progressRemained > 100 ? 0 : 100 - progressRemained); + } + @Test void testDifferenceBetweenTableRebalanceStates() { Map<String, Map<String, String>> target = new TreeMap<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org