This is an automated email from the ASF dual-hosted git repository.

somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 78cb37c189 Metrics that Tracks the Progress of Table Rebalance Jobs 
(#15650)
78cb37c189 is described below

commit 78cb37c189dee21786309700efbdc068635b27af
Author: Jhow <44998515+j-howhu...@users.noreply.github.com>
AuthorDate: Fri May 2 17:59:32 2025 -0700

    Metrics that Tracks the Progress of Table Rebalance Jobs (#15650)
---
 .../pinot/common/metrics/ControllerGauge.java      |   6 +-
 .../rebalance/ZkBasedTableRebalanceObserver.java   |  42 +++++
 .../TestZkBasedTableRebalanceObserver.java         | 184 ++++++++++++++++++++-
 3 files changed, 229 insertions(+), 3 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 51ff09387c..ddbb67a049 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -216,7 +216,11 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   // Bytes to be written to deep store
   DEEP_STORE_WRITE_BYTES_IN_PROGRESS("deepStoreWriteBytesInProgress", true),
   // Count of deep store segment writes that are currently in progress
-  DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true);
+  DEEP_STORE_WRITE_OPS_IN_PROGRESS("deepStoreWriteOpsInProgress", true),
+
+  // The progress of a certain table rebalance job of a table
+  TABLE_REBALANCE_JOB_PROGRESS_PERCENT("percent", false);
+
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 5eaa80e91d..37da8bd9ec 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -84,6 +84,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     switch (trigger) {
       case START_TRIGGER:
         updateOnStart(currentState, targetState, rebalanceContext);
+        
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
         trackStatsInZk();
         updatedStatsInZk = true;
         break;
@@ -104,6 +105,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
           }
           if 
(!_tableRebalanceProgressStats.getRebalanceProgressStatsOverall().equals(latestProgress))
 {
             
_tableRebalanceProgressStats.setRebalanceProgressStatsOverall(latestProgress);
+            emitProgressMetric(latestProgress);
           }
           trackStatsInZk();
           updatedStatsInZk = true;
@@ -129,6 +131,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
           if 
(!_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep().equals(latestProgress))
 {
             
_tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(latestProgress);
           }
+          
emitProgressMetric(_tableRebalanceProgressStats.getRebalanceProgressStatsOverall());
           trackStatsInZk();
           updatedStatsInZk = true;
         }
@@ -199,6 +202,7 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
         new TableRebalanceProgressStats.RebalanceProgressStats();
     
_tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(progressStats);
     trackStatsInZk();
+    emitProgressMetricDone();
   }
 
   @Override
@@ -234,6 +238,39 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     return _numUpdatesToZk;
   }
 
+  /**
+   * Emits the rebalance progress in percent to the metrics, which is 
calculated as:
+   *          (totalRemainingSegmentsToBeAdded + 
totalRemainingSegmentsToBeDeleted + totalRemainingSegmentsToConverge
+   *                       + totalCarryOverSegmentsToBeAdded + 
totalCarryOverSegmentsToBeDeleted)
+   * 100%  -   
-------------------------------------------------------------------------------------------------
 * 100%
+   *                                (totalSegmentsToBeAdded + 
totalSegmentsToBeDeleted)
+   * Notice that for some jobs, the metrics may not be exactly accurate and 
would not be 100% when the job is done.
+   * (e.g. when `lowDiskMode=false`, the job finishes without waiting for 
`totalRemainingSegmentsToBeDeleted` become
+   * 0, or when `bestEffort=true` the job finishes without waiting for both 
`totalRemainingSegmentsToBeAdded`,
+   * `totalRemainingSegmentsToBeDeleted`, and 
`totalRemainingSegmentsToConverge` become 0)
+   * Therefore `emitProgressMetricDone()` should be called to emit the final 
progress as the time job exits.
+   * @param overallProgress the latest overall progress
+   */
+  private void 
emitProgressMetric(TableRebalanceProgressStats.RebalanceProgressStats 
overallProgress) {
+    // Round this up so the metric is 100 only when no segment remains
+    long progressPercent = 100 - (long) 
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+        overallProgress._totalSegmentsToBeAdded + 
overallProgress._totalSegmentsToBeDeleted,
+        overallProgress._totalRemainingSegmentsToBeAdded + 
overallProgress._totalRemainingSegmentsToBeDeleted
+            + overallProgress._totalRemainingSegmentsToConverge + 
overallProgress._totalCarryOverSegmentsToBeAdded
+            + overallProgress._totalCarryOverSegmentsToBeDeleted));
+    _controllerMetrics.setValueOfTableGauge(_tableNameWithType, 
ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT,
+        progressPercent < 0 ? 0 : progressPercent);
+  }
+
+  /**
+   * Emits the rebalance progress as 100 (%) to the metrics. This is to ensure 
that the progress is at least aligned
+   * when the job done to avoid confusion
+   */
+  private void emitProgressMetricDone() {
+    _controllerMetrics.setValueOfTableGauge(_tableNameWithType, 
ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT,
+        100);
+  }
+
   @VisibleForTesting
   TableRebalanceContext getTableRebalanceContext() {
     return _tableRebalanceContext;
@@ -297,6 +334,11 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
     return jobMetadata;
   }
 
+  @VisibleForTesting
+  TableRebalanceProgressStats getTableRebalanceProgressStats() {
+    return _tableRebalanceProgressStats;
+  }
+
   /**
    * Takes in targetState and sourceState and computes stats based on the 
comparison between sourceState and
    * targetState.This captures how far the source state is from the target 
state. Example - if there are 4 segments and
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 7ae66a7b30..043269f037 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -24,10 +24,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
-import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import static 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING;
@@ -38,10 +38,170 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
 public class TestZkBasedTableRebalanceObserver {
+  @Test
+  void testZkObserverProgressStats() {
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+    // Mocking this. We will verify using numZkUpdate stat
+    when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), 
any())).thenReturn(true);
+    ControllerMetrics controllerMetrics = ControllerMetrics.get();
+    TableRebalanceContext retryCtx = new TableRebalanceContext();
+    retryCtx.setConfig(new RebalanceConfig());
+    ZkBasedTableRebalanceObserver observer =
+        new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx, 
pinotHelixResourceManager);
+    Map<String, Map<String, String>> source = new TreeMap<>();
+    Map<String, Map<String, String>> target = new TreeMap<>();
+    Map<String, Map<String, String>> targetIntermediate = new TreeMap<>();
+    Map<String, Map<String, String>> sourceIntermediate = new TreeMap<>();
+    source.put("segment1", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1"), ONLINE));
+    source.put("segment2", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4"), ONLINE));
+    target.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2", "host3"), ONLINE));
+    target.put("segment2",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", 
"host5", "host6"), ONLINE));
+    targetIntermediate.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2"), ONLINE));
+    targetIntermediate.put("segment2",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4", 
"host5"), ONLINE));
+
+    sourceIntermediate.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host2"), ONLINE));
+    sourceIntermediate.put("segment2", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4"), ONLINE));
+
+    Set<String> segmentSet = new HashSet<>(source.keySet());
+    segmentSet.addAll(target.keySet());
+    TableRebalanceObserver.RebalanceContext rebalanceContext =
+        new TableRebalanceObserver.RebalanceContext(-1, segmentSet, 
segmentSet);
+    // START_TRIGGER will set up the ZK progress stats to have the diff 
between source and target. When calling the
+    // triggers for IS and EV-IS, since source and source are compared, the 
diff will change for the IS trigger
+    // but not for the EV-IS trigger, so ZK must be updated 1 extra time
+    observer.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, source, 
target, rebalanceContext);
+    TableRebalanceProgressStats.RebalanceProgressStats overallStats =
+        
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    TableRebalanceProgressStats.RebalanceProgressStats currentStepStats =
+        
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+    
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, 
source, target, rebalanceContext);
+    overallStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    currentStepStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+    checkProgressPercentMetrics(controllerMetrics, observer);
+    // This simulates the first step of rebalance, where the IS is set to the 
intermediate assignment
+    
observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER,
 source, targetIntermediate,
+        rebalanceContext);
+    overallStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    currentStepStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+    
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+        sourceIntermediate, targetIntermediate, rebalanceContext);
+    overallStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    currentStepStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 3);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 1);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+    checkProgressPercentMetrics(controllerMetrics, observer);
+
+    // Assume bestEfforts=true and we didn't wait for the second segment to 
converge before moving to next step
+    // Here the currentAssignment is based on the IS and not the EV. IS is 
fully updated to the targetIntermediate
+    
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, 
targetIntermediate, target,
+        rebalanceContext);
+    overallStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    currentStepStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 2);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 1);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+    checkProgressPercentMetrics(controllerMetrics, observer);
+    // Next assignment calculated based on the IS, IS should be same as the 
previous targetAssignment
+    
observer.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER,
 targetIntermediate, target,
+        rebalanceContext);
+    overallStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    currentStepStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 2);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+    checkProgressPercentMetrics(controllerMetrics, observer);
+    
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+        sourceIntermediate, target, rebalanceContext);
+    overallStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    currentStepStats = 
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsCurrentStep();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 4);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 2);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 1);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeAdded, 2);
+    assertEquals(currentStepStats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeAdded, 1);
+    assertEquals(currentStepStats._totalCarryOverSegmentsToBeDeleted, 0);
+    checkProgressPercentMetrics(controllerMetrics, observer);
+  }
 
   // This is a test to verify if Zk stats are pushed out correctly
   @Test
@@ -49,7 +209,7 @@ public class TestZkBasedTableRebalanceObserver {
     PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
     // Mocking this. We will verify using numZkUpdate stat
     when(pinotHelixResourceManager.addControllerJobToZK(any(), any(), 
any())).thenReturn(true);
-    ControllerMetrics controllerMetrics = 
Mockito.mock(ControllerMetrics.class);
+    ControllerMetrics controllerMetrics = ControllerMetrics.get();
     TableRebalanceContext retryCtx = new TableRebalanceContext();
     retryCtx.setConfig(new RebalanceConfig());
     ZkBasedTableRebalanceObserver observer =
@@ -67,16 +227,21 @@ public class TestZkBasedTableRebalanceObserver {
         segmentSet, segmentSet);
     observer.onTrigger(TableRebalanceObserver.Trigger.START_TRIGGER, source, 
target, rebalanceContext);
     assertEquals(observer.getNumUpdatesToZk(), 1);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, 
source, source, rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
 source, source,
         rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     // START_TRIGGER will set up the ZK progress stats to have the diff 
between source and target. When calling the
     // triggers for IS and EV-IS, since source and source are compared, the 
diff will change for the IS trigger
     // but not for the EV-IS trigger, so ZK must be updated 1 extra time
     assertEquals(observer.getNumUpdatesToZk(), 2);
     
observer.onTrigger(TableRebalanceObserver.Trigger.IDEAL_STATE_CHANGE_TRIGGER, 
source, target, rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     
observer.onTrigger(TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
 source, target,
         rebalanceContext);
+    checkProgressPercentMetrics(controllerMetrics, observer);
     // Both of the changes above will update ZK for progress stats
     assertEquals(observer.getNumUpdatesToZk(), 4);
     // Try a rollback and this should trigger a ZK update as well
@@ -84,6 +249,21 @@ public class TestZkBasedTableRebalanceObserver {
     assertEquals(observer.getNumUpdatesToZk(), 5);
   }
 
+  private void checkProgressPercentMetrics(ControllerMetrics controllerMetrics,
+      ZkBasedTableRebalanceObserver observer) {
+    Long progressGaugeValue =
+        
controllerMetrics.getGaugeValue(ControllerGauge.TABLE_REBALANCE_JOB_PROGRESS_PERCENT.getGaugeName()
 + ".dummy");
+    assertNotNull(progressGaugeValue);
+    TableRebalanceProgressStats.RebalanceProgressStats overallProgress =
+        
observer.getTableRebalanceProgressStats().getRebalanceProgressStatsOverall();
+    long progressRemained = (long) 
Math.ceil(TableRebalanceProgressStats.calculatePercentageChange(
+        overallProgress._totalSegmentsToBeAdded + 
overallProgress._totalSegmentsToBeDeleted,
+        overallProgress._totalRemainingSegmentsToBeAdded + 
overallProgress._totalRemainingSegmentsToBeDeleted
+            + overallProgress._totalRemainingSegmentsToConverge + 
overallProgress._totalCarryOverSegmentsToBeAdded
+            + overallProgress._totalCarryOverSegmentsToBeDeleted));
+    assertEquals(progressGaugeValue, progressRemained > 100 ? 0 : 100 - 
progressRemained);
+  }
+
   @Test
   void testDifferenceBetweenTableRebalanceStates() {
     Map<String, Map<String, String>> target = new TreeMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to