raghavyadav01 commented on code in PR #15266:
URL: https://github.com/apache/pinot/pull/15266#discussion_r2023466733


##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java:
##########
@@ -1264,116 +1264,95 @@ public void testIsExternalViewConverged() {
     boolean[] falseAndTrue = new boolean[]{false, true};
 
     // Empty segment states should match
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
-      }
+    for (boolean bestEfforts : falseAndTrue) {
+      assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+          idealStateSegmentStates, bestEfforts, null));
     }
 
     // Do not check segment that does not exist in IdealState
     Map<String, String> instanceStateMap = new TreeMap<>();
     instanceStateMap.put("instance1", ONLINE);
     externalViewSegmentStates.put("segment1", instanceStateMap);
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
-      }
+    for (boolean bestEfforts : falseAndTrue) {
+      assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+          idealStateSegmentStates, bestEfforts, null));
     }
 
     // Do not check segment that is OFFLINE in IdealState
     instanceStateMap = new TreeMap<>();
     instanceStateMap.put("instance1", OFFLINE);
     idealStateSegmentStates.put("segment2", instanceStateMap);
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
-      }
+    for (boolean bestEfforts : falseAndTrue) {
+      assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+          idealStateSegmentStates, bestEfforts, null));
     }
 
     // Should fail when a segment has CONSUMING instance in IdealState but 
does not exist in ExternalView
     instanceStateMap.put("instance2", CONSUMING);
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
-      }
+    for (boolean bestEfforts : falseAndTrue) {
+      assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+          idealStateSegmentStates, bestEfforts, null));
     }
 
     // Should fail when instance state does not exist
     instanceStateMap = new TreeMap<>();
     externalViewSegmentStates.put("segment2", instanceStateMap);
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
-      }
+    for (boolean bestEfforts : falseAndTrue) {
+      assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+          idealStateSegmentStates, bestEfforts, null));
     }
 
     // Should fail when instance state does not match
     instanceStateMap.put("instance2", OFFLINE);
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
-      }
+    for (boolean bestEfforts : falseAndTrue) {
+      assertFalse(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+          idealStateSegmentStates, bestEfforts, null));
     }
 
     // Should pass when instance state matches
     instanceStateMap.put("instance2", CONSUMING);
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-            idealStateSegmentStates, lowDiskMode, bestEfforts, null));
-      }
+    for (boolean bestEfforts : falseAndTrue) {
+      assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+          idealStateSegmentStates, bestEfforts, null));
     }
 
-    // When there are extra instances in ExternalView, should pass in regular 
mode but fail in low disk mode
+    // When there are extra instances in ExternalView, should fail (always 
wait for extra instances to be removed)
     instanceStateMap.put("instance3", CONSUMING);
     for (boolean bestEfforts : falseAndTrue) {
-      assertTrue(
-          TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-              false, bestEfforts, null));
       assertFalse(
           TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-              true, bestEfforts, null));
+              bestEfforts, null));
     }
 
     // When instance state is ERROR in ExternalView, should fail in regular 
mode but pass in best-efforts mode
     instanceStateMap.put("instance2", ERROR);
     instanceStateMap.remove("instance3");
-    for (boolean lowDiskMode : falseAndTrue) {
-      try {
-        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-            lowDiskMode, false, null);
-        fail();
-      } catch (Exception e) {
-        // Expected
-      }
-      assertTrue(
-          TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
-              lowDiskMode, true, null));
+    try {
+      TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
+          false, null);
+      fail();
+    } catch (Exception e) {
+      // Expected
     }
+    assertTrue(
+        TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates, idealStateSegmentStates,
+            true, null));
 
-    // When the extra instance is in ERROR state, should throw exception in 
low disk mode when best-efforts is disabled
+    // When the extra instance is in ERROR state, should throw exception when 
best-efforts is disabled
     instanceStateMap.put("instance2", CONSUMING);
     instanceStateMap.put("instance3", ERROR);
-    for (boolean lowDiskMode : falseAndTrue) {
-      for (boolean bestEfforts : falseAndTrue) {
-        if (lowDiskMode && !bestEfforts) {
-          try {
-            TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-                idealStateSegmentStates, true, false, null);
-            fail();
-          } catch (Exception e) {
-            // Expected
-          }
-        } else {
-          assertTrue(TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
-              idealStateSegmentStates, lowDiskMode, bestEfforts, null));
+    for (boolean bestEfforts : falseAndTrue) {
+      if (!bestEfforts) {
+        try {
+          TableRebalancer.isExternalViewConverged(offlineTableName, 
externalViewSegmentStates,
+              idealStateSegmentStates, false, null);
+          fail();
+        } catch (Exception e) {
+          // Expected

Review Comment:
   Will there be a value adding log here? It might flood if it is too 
aggressive. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -68,31 +70,67 @@ public ZkBasedTableRebalanceObserver(String 
tableNameWithType, String rebalanceJ
 
   @Override
   public void onTrigger(Trigger trigger, Map<String, Map<String, String>> 
currentState,
-      Map<String, Map<String, String>> targetState) {
+      Map<String, Map<String, String>> targetState, RebalanceContext 
rebalanceContext) {
     boolean updatedStatsInZk = false;
     _controllerMetrics.setValueOfTableGauge(_tableNameWithType, 
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1);
+    TableRebalanceProgressStats.RebalanceStateStats latest;
+    TableRebalanceProgressStats.RebalanceProgressStats latestProgress;
     switch (trigger) {
       case START_TRIGGER:
-        updateOnStart(currentState, targetState);
+        updateOnStart(currentState, targetState, rebalanceContext);
         trackStatsInZk();
         updatedStatsInZk = true;
         break;
       // Write to Zk if there's change since previous stats computation
       case IDEAL_STATE_CHANGE_TRIGGER:
-        TableRebalanceProgressStats.RebalanceStateStats latest =
-            getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        latestProgress = calculateOverallProgressStats(targetState,
+            currentState, rebalanceContext, 
Trigger.IDEAL_STATE_CHANGE_TRIGGER, _tableRebalanceProgressStats);
         if 
(TableRebalanceProgressStats.statsDiffer(_tableRebalanceProgressStats.getCurrentToTargetConvergence(),
-            latest)) {
-          _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
+            latest) || TableRebalanceProgressStats.progressStatsDiffer(
+            _tableRebalanceProgressStats.getRebalanceProgressStatsOverall(), 
latestProgress)) {
+          if (TableRebalanceProgressStats.statsDiffer(
+              
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
+            _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);

Review Comment:
   By regressed I mean can the stats go back or become negative. 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -271,17 +315,279 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         String instanceName = instanceStateEntry.getKey();
         String sourceInstanceState = sourceInstanceStateMap.get(instanceName);
         if (!targetStateInstanceState.equals(sourceInstanceState)) {
-          rebalanceStats._replicasToRebalance++;
+          rebalanceStats._totalSegmentsToRebalance++;
           hasSegmentConverged = false;
         }
       }
       if (!hasSegmentConverged) {
-        rebalanceStats._segmentsToRebalance++;
+        rebalanceStats._uniqueSegmentsToRebalance++;
       }
     }
     int totalSegments = targetState.size();
-    rebalanceStats._percentSegmentsToRebalance =
-        (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
+    rebalanceStats._percentRemainingSegmentsToRebalance =
+        (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._uniqueSegmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Updates the overall progress stats based on the current step's progress 
stats. This should be called
+   * during the EV-IS convergence trigger to ensure the overall stats reflect 
the changes as they are made.
+   * @param rebalanceProgressStats the rebalance stats
+   * @param lastStepStats step level stats from the last iteration
+   * @param latestStepStats latest step level stats calculated in this 
iteration
+   * @return the newly calculated overall progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
updateOverallProgressStatsFromStep(
+      TableRebalanceProgressStats rebalanceProgressStats,
+      TableRebalanceProgressStats.RebalanceProgressStats lastStepStats,
+      TableRebalanceProgressStats.RebalanceProgressStats latestStepStats) {
+    int numAdditionalSegmentsAdded =
+        latestStepStats._totalSegmentsToBeAdded - 
lastStepStats._totalSegmentsToBeAdded;
+    int numAdditionalSegmentsDeleted =
+        latestStepStats._totalSegmentsToBeDeleted - 
lastStepStats._totalSegmentsToBeDeleted;
+    int numSegmentAddsProcessedInLastStep = 
Math.abs(lastStepStats._totalRemainingSegmentsToBeAdded
+        - latestStepStats._totalRemainingSegmentsToBeAdded);
+    int numSegmentDeletesProcessedInLastStep = 
Math.abs(lastStepStats._totalRemainingSegmentsToBeDeleted
+        - latestStepStats._totalRemainingSegmentsToBeDeleted);
+    int numberNewUntrackedSegmentsAdded = 
latestStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance
+        - lastStepStats._totalUniqueNewUntrackedSegmentsDuringRebalance;
+
+    TableRebalanceProgressStats.RebalanceProgressStats overallProgressStats =
+        rebalanceProgressStats.getRebalanceProgressStatsOverall();
+
+    TableRebalanceProgressStats.RebalanceProgressStats newOverallProgressStats 
=
+        new TableRebalanceProgressStats.RebalanceProgressStats();
+
+    newOverallProgressStats._totalSegmentsToBeAdded = 
overallProgressStats._totalSegmentsToBeAdded
+        + numAdditionalSegmentsAdded;
+    newOverallProgressStats._totalSegmentsToBeDeleted = 
overallProgressStats._totalSegmentsToBeDeleted
+        + numAdditionalSegmentsDeleted;
+    newOverallProgressStats._totalRemainingSegmentsToBeAdded = 
numAdditionalSegmentsAdded == 0
+        ? overallProgressStats._totalRemainingSegmentsToBeAdded - 
numSegmentAddsProcessedInLastStep
+        : overallProgressStats._totalRemainingSegmentsToBeAdded + 
numSegmentAddsProcessedInLastStep;
+    newOverallProgressStats._totalRemainingSegmentsToBeDeleted = 
numAdditionalSegmentsDeleted == 0
+        ? overallProgressStats._totalRemainingSegmentsToBeDeleted - 
numSegmentDeletesProcessedInLastStep
+        : overallProgressStats._totalRemainingSegmentsToBeAdded + 
numSegmentDeletesProcessedInLastStep;
+    newOverallProgressStats._totalRemainingSegmentsToConverge = 
latestStepStats._totalRemainingSegmentsToConverge;
+    newOverallProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance =
+        overallProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance + 
numberNewUntrackedSegmentsAdded;
+    newOverallProgressStats._percentageTotalSegmentsAddsRemaining =
+        
calculatePercentageChange(newOverallProgressStats._totalSegmentsToBeAdded,

Review Comment:
   Thanks. Looks good. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to