This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ee441e18d Metrics that Tracks the Progress of each Table Rebalance 
Job (#15518)
4ee441e18d is described below

commit 4ee441e18db02d94fcc5d4a2affddaf20d825290
Author: Jhow <44998515+j-howhu...@users.noreply.github.com>
AuthorDate: Fri Apr 25 14:49:20 2025 -0700

    Metrics that Tracks the Progress of each Table Rebalance Job (#15518)
---
 .../configs/controller.yml                         |  9 +++++
 .../pinot/common/metrics/ControllerGauge.java      |  6 +++-
 .../ControllerPrometheusMetricsTest.java           |  5 +++
 .../prometheus/PinotPrometheusMetricsTest.java     |  6 ++++
 .../rebalance/ZkBasedTableRebalanceObserver.java   | 40 ++++++++++++++++++++++
 .../TestZkBasedTableRebalanceObserver.java         | 27 +++++++++++++--
 6 files changed, 89 insertions(+), 4 deletions(-)

diff --git 
a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml 
b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
index 4b06ad572e..5023cc639d 100644
--- a/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
+++ b/docker/images/pinot/etc/jmx_prometheus_javaagent/configs/controller.yml
@@ -48,6 +48,15 @@ rules:
     table: "$2$4"
     tableType: "$5"
     taskType: "$6"
+# Gauge for rebalanceJobId and tableNameWithType
+- pattern: 
"\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", 
name=\"pinot\\.controller\\.tableRebalanceJobProgressPercent\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.([0-9a-fA-F-]+)\"><>(\\w+)"
+  name: "pinot_controller_tableRebalanceJobProgressPercent_$6"
+  cache: true
+  labels:
+    database: "$2"
+    table: "$1$3"
+    tableType: "$4"
+    jobId: "$5"
   # Special handling for timers like cronSchedulerJobExecutionTimeMs and 
tableRebalanceExecutionTimeMs which use table name, table type and another 
string for status / taskType
 - pattern: 
"\"org\\.apache\\.pinot\\.common\\.metrics\"<type=\"ControllerMetrics\", 
name=\"pinot\\.controller\\.(([^.]+)\\.)?([^.]*)_(OFFLINE|REALTIME)\\.(\\w+)\\.cronSchedulerJobExecutionTimeMs\"><>(\\w+)"
   name: "pinot_controller_cronSchedulerJobExecutionTimeMs_$6"
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 51ff09387c..ddbb67a049 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -216,7 +216,11 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // Bytes to be written to deep store
   DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true),
   // Count of deep store segment writes that are currently in progress
-  DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true);
+  DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true),
+
+  // The progress of a certain table rebalance job of a table
+  TABLE_REBALANCE_JOB_PROGRESS_PERCENT("percent", false);
+
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
index 45583184d4..0cf11bc020 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ControllerPrometheusMetricsTest.java
@@ -178,6 +178,11 @@ public abstract class ControllerPrometheusMetricsTest 
extends PinotPrometheusMet
             String.format("%s.%s", 
ExportedLabelValues.CONTROLLER_PERIODIC_TASK_CHC, TaskState.IN_PROGRESS));
         
assertGaugeExportedCorrectly(ControllerGauge.TASK_STATUS.getGaugeName(),
             ExportedLabels.JOBSTATUS_CONTROLLER_TASKTYPE, 
EXPORTED_METRIC_PREFIX);
+      } else if (controllerGauge == 
ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT) {
+        addGaugeWithLabels(controllerGauge,
+            String.format("%s.%s", TABLE_NAME_WITH_TYPE, REBALANCE_JOB_ID));
+        assertGaugeExportedCorrectly(controllerGauge.getGaugeName(),
+            ExportedLabels.JOBID_TABLENAME_TABLETYPE, EXPORTED_METRIC_PREFIX);
       } else {
         addGaugeWithLabels(controllerGauge, TABLE_NAME_WITH_TYPE);
         assertGaugeExportedCorrectly(controllerGauge.getGaugeName(), 
ExportedLabels.TABLENAME_TABLETYPE,
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
index 5173bc00df..f7d0d7462e 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/PinotPrometheusMetricsTest.java
@@ -37,6 +37,7 @@ import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -68,6 +69,7 @@ public abstract class PinotPrometheusMetricsTest {
   protected static final String PARTITION_GROUP_ID = "partitionGroupId";
   protected static final String CLIENT_ID =
       String.format("%s-%s-%s", TABLE_NAME_WITH_TYPE, KAFKA_TOPIC, 
PARTITION_GROUP_ID);
+  protected static final String REBALANCE_JOB_ID = 
UUID.randomUUID().toString();
 
   protected HttpClient _httpClient;
 
@@ -336,6 +338,9 @@ public abstract class PinotPrometheusMetricsTest {
     public static final List<String> TASKTYPE_TABLENAME_TABLETYPE =
         List.of(TASKTYPE, ExportedLabelValues.MINION_TASK_SEGMENT_IMPORT, 
TABLE, ExportedLabelValues.TABLENAME,
             TABLETYPE, TABLETYPE_REALTIME);
+
+    public static final List<String> JOBID_TABLENAME_TABLETYPE =
+        List.of(JOBID, REBALANCE_JOB_ID, TABLE, ExportedLabelValues.TABLENAME, 
TABLETYPE, TABLETYPE_REALTIME);
   }
 
   public static class ExportedLabelKeys {
@@ -348,6 +353,7 @@ public abstract class PinotPrometheusMetricsTest {
     public static final String PERIODIC_TASK = "periodicTask";
     public static final String STATUS = "status";
     public static final String DATABASE = "database";
+    public static final String JOBID = "jobId";
   }
 
   public static class ExportedLabelValues {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 125b0568bc..6b12a9a225 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -84,6 +84,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     switch (trigger) {
       case START_TRIGGER:
         updateOnStart(currentState, targetState, rebalanceContext);
+        
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
         trackStatsInZk();
         updatedStatsInZk = true;
         break;
@@ -104,6 +105,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
           }
           if 
(!_tableRebalanceProgressStats.getRebalanceProgressStatsOverall().equals(latestProgress))
 {
             
_tableRebalanceProgressStats.setRebalanceProgressStatsOverall(latestProgress);
+            emitProgressMetric(latestProgress);
           }
           trackStatsInZk();
           updatedStatsInZk = true;
@@ -129,6 +131,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
           if 
(!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress))
 {
             
_tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(latestProgress);
           }
+          
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
           trackStatsInZk();
           updatedStatsInZk = true;
         }
@@ -199,6 +202,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
         new TableRebalanceProgressStats.RebalanceProgressStats();
     
_tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(progressStats);
     trackStatsInZk();
+    emitProgressMetricDone();
   }
 
   @Override
@@ -234,6 +238,37 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     return _numUpdatesToZk;
   }
 
+  /**
+   * Emits the rebalance progress in percent to the metrics. Uses the 
percentage of remaining segments to be added as
+   * the indicator of the overall progress.
+   * Notice that for some jobs, the metrics may not be exactly accurate and 
would not be 100% when the job is done.
+   * (e.g. when `lowDiskMode=false`, the job finishes without waiting for 
`totalRemainingSegmentsToBeDeleted` become
+   * 0, or when `bestEffort=true` the job finishes without waiting for both 
`totalRemainingSegmentsToBeAdded`,
+   * `totalRemainingSegmentsToBeDeleted`, and 
`totalRemainingSegmentsToConverge` become 0)
+   * Therefore `emitProgressMetricDone()` should be called to emit the final 
progress as the time job exits.
+   * @param overallProgress the latest overall progress
+   */
+  private void 
emitProgressMetric(TableRebalanceProgressStats.RebalanceProgressStats 
overallProgress) {
+    // Round this up so the metric is 100 only when no segment remains
+    long progressPercent = 100 - (long) 
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+        overallProgress._totalSegmentsToBeAdded + 
overallProgress._totalSegmentsToBeDeleted,
+        overallProgress._totalRemainingSegmentsToBeAdded + 
overallProgress._totalRemainingSegmentsToBeDeleted
+            + overallProgress._totalRemainingSegmentsToConverge));
+    // Using the original job ID to group rebalance retries together with the 
same label
+    _controllerMetrics.setValueOfTableGauge(_tableNameWithType + "." + 
_tableRebalanceContext.getOriginalJobId(),
+        ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT,
+        progressPercent < 0 ? 0 : progressPercent);
+  }
+
+  /**
+   * Emits the rebalance progress as 100 (%) to the metrics. This is to ensure 
that the progress is at least aligned
+   * when the job done to avoid confusion
+   */
+  private void emitProgressMetricDone() {
+    _controllerMetrics.setValueOfTableGauge(_tableNameWithType + "." + 
_tableRebalanceContext.getOriginalJobId(),
+        ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT, 100);
+  }
+
   @VisibleForTesting
   TableRebalanceContext getTableRebalanceContext() {
     return _tableRebalanceContext;
@@ -297,6 +332,11 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     return jobMetadata;
   }
 
+  @VisibleForTesting
+  TableRebalanceProgressStats getTableRebalanceProgressStats() {
+    return _tableRebalanceProgressStats;
+  }
+
   /**
    * Takes in targetState and sourceState and computes stats based on the 
comparison between sourceState and
    * targetState.This captures how far the source state is from the target 
state. Example - if there are 4 segments and
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 0d2f395d79..e80530c082 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -24,10 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
@@ -38,6 +38,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -49,11 +50,12 @@ public class TestZkBasedTableRebalanceObserver {
     PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
     // Mocking this. We will verify using numZkUpdate stat
     when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), 
any())).thenReturn(true);
-    ControllerMetrics controllerMetrics = 
Mockito.mock(ControllerMetrics.class);
+    ControllerMetrics controllerMetrics = ControllerMetrics.get();
     TableRebalanceContext retryCtx = new TableRebalanceContext();
     retryCtx.setConfig(new RebalanceConfig());
+    retryCtx.setOriginalJobId("testZkObserverTracking");
     ZkBasedTableRebalanceObserver observer =
-        new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, 
pinotHelixResourceManager);
+        new ZkBasedTableRebalanceObserver("dummy", "testZkObserverTracking", 
retryCtx, pinotHelixResourceManager);
     Map<String, Map<String, String>> source = new TreeMap<>();
     Map<String, Map<String, String>> target = new TreeMap<>();
     target.put("segment1",
@@ -67,16 +69,21 @@ public class TestZkBasedTableRebalanceObserver {
         segmentSet, segmentSet);
     observer.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, source, 
target, rebalanceContext);
     assertEquals(observer.getNumUpdatesToZk(), 1);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, 
source, source, rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
 source, source,
         rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     // START_TRIGGER will set up the ZK progress stats to have the diff 
between source and target. When calling the
     // triggers for IS and EV-IS, since source and source are compared, the 
diff will change for the IS trigger
     // but not for the EV-IS trigger, so ZK must be updated 1 extra time
     assertEquals(observer.getNumUpdatesToZk(), 2);
     
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, 
source, target, rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
 source, target,
         rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     // Both of the changes above will update ZK for progress stats
     assertEquals(observer.getNumUpdatesToZk(), 4);
     // Try a rollback and this should trigger a ZK update as well
@@ -84,6 +91,20 @@ public class TestZkBasedTableRebalanceObserver {
     assertEquals(observer.getNumUpdatesToZk(), 5);
   }
 
+  private void checkProgressPercentMetrics(ControllerMetrics controllerMetrics,
+      ZkBasedTableRebalanceObserver observer) {
+    Long progressGaugeValue = controllerMetrics.getGaugeValue(
+        ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT.getGaugeName() + 
".dummy.testZkObserverTracking");
+    assertNotNull(progressGaugeValue);
+    TableRebalanceProgressStats.RebalanceProgressStats overallProgress =
+        
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    long progressRemained = (long) 
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+        overallProgress._totalSegmentsToBeAdded + 
overallProgress._totalSegmentsToBeDeleted,
+        overallProgress._totalRemainingSegmentsToBeAdded + 
overallProgress._totalRemainingSegmentsToBeDeleted
+            + overallProgress._totalRemainingSegmentsToConverge));
+    assertEquals(progressGaugeValue, progressRemained > 100 ? 0 : 100 - 
progressRemained);
+  }
+
   @Test
   void testDifferenceBetweenTableRebalanceStates() {
     Map<String, Map<String, String>> target = new TreeMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to