somandal commented on code in PR #15618:
URL: https://github.com/apache/pinot/pull/15618#discussion_r2074200163


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1300,50 +1301,91 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
       long estimateAverageSegmentSizeInBytes, Set<String> 
allSegmentsFromIdealState,
       Logger tableRebalanceLogger)
       throws InterruptedException, TimeoutException {
-    long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
+    long startTimeMs = System.currentTimeMillis();
+    long endTimeMs = startTimeMs + externalViewStabilizationTimeoutInMs;
+    int extensionCount = 0;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
-        }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    tableRebalanceLogger.info("Waiting for ExternalView to converge, {} 
segments to monitor in current step",

Review Comment:
   nit: reword "Starting EV-IS convergence check loop, {} segments to monitor 
in current step"



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1300,50 +1301,91 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
       long estimateAverageSegmentSizeInBytes, Set<String> 
allSegmentsFromIdealState,
       Logger tableRebalanceLogger)
       throws InterruptedException, TimeoutException {
-    long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
+    long startTimeMs = System.currentTimeMillis();
+    long endTimeMs = startTimeMs + externalViewStabilizationTimeoutInMs;
+    int extensionCount = 0;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
-        }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    tableRebalanceLogger.info("Waiting for ExternalView to converge, {} 
segments to monitor in current step",
+        segmentsToMonitor.size());
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged in {}ms, with {} 
extensions",
+                System.currentTimeMillis() - startTimeMs, extensionCount);
+            return idealState;
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            tableRebalanceLogger.info("Remaining {} segments to be 
processed.", previousRemainingSegments);
+          }
         }
+        tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
+            externalViewCheckIntervalInMs);
+        Thread.sleep(externalViewCheckIntervalInMs);
+      } while (System.currentTimeMillis() < endTimeMs);
+
+      if (bestEfforts) {
+        tableRebalanceLogger.warn(
+            "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
+            externalViewStabilizationTimeoutInMs);
+        return idealState;
       }
-      tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
-          externalViewCheckIntervalInMs);
-      Thread.sleep(externalViewCheckIntervalInMs);
-    } while (System.currentTimeMillis() < endTimeMs);
 
-    if (bestEfforts) {
-      tableRebalanceLogger.warn(
-          "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
-          externalViewStabilizationTimeoutInMs);
-      return idealState;
-    } else {
-      throw new TimeoutException(String.format("ExternalView has not converged 
within: %d ms",
-          externalViewStabilizationTimeoutInMs));
+      if (externalView == null) {
+        tableRebalanceLogger.warn("ExternalView is null, will not extend the 
EV stabilization timeout.");
+        throw new TimeoutException(
+            String.format("ExternalView is null, cannot wait for it to 
converge within %dms",
+                externalViewStabilizationTimeoutInMs));
+      }

Review Comment:
   nit: add an empty line after this for better code readability



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -485,7 +485,8 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
             externalViewCheckIntervalInMs, 
externalViewStabilizationTimeoutInMs, estimatedAverageSegmentSizeInBytes,
             allSegmentsFromIdealState, tableRebalanceLogger);
       } catch (Exception e) {
-        String errorMsg = "Caught exception while waiting for ExternalView to 
converge, aborting the rebalance";
+        String errorMsg =
+            "Caught exception while waiting for ExternalView to converge, 
aborting the rebalance: " + e.getMessage();

Review Comment:
   why is `+ e.getMessage()` needed? won't `tableRebalanceLogger.warn(errorMsg, 
e);` automatically print the exception message as well?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1300,50 +1301,91 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
       long estimateAverageSegmentSizeInBytes, Set<String> 
allSegmentsFromIdealState,
       Logger tableRebalanceLogger)
       throws InterruptedException, TimeoutException {
-    long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
+    long startTimeMs = System.currentTimeMillis();
+    long endTimeMs = startTimeMs + externalViewStabilizationTimeoutInMs;
+    int extensionCount = 0;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
-        }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    tableRebalanceLogger.info("Waiting for ExternalView to converge, {} 
segments to monitor in current step",
+        segmentsToMonitor.size());
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged in {}ms, with {} 
extensions",
+                System.currentTimeMillis() - startTimeMs, extensionCount);
+            return idealState;
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            tableRebalanceLogger.info("Remaining {} segments to be 
processed.", previousRemainingSegments);
+          }
         }
+        tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
+            externalViewCheckIntervalInMs);
+        Thread.sleep(externalViewCheckIntervalInMs);
+      } while (System.currentTimeMillis() < endTimeMs);
+
+      if (bestEfforts) {
+        tableRebalanceLogger.warn(
+            "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
+            externalViewStabilizationTimeoutInMs);
+        return idealState;
       }
-      tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
-          externalViewCheckIntervalInMs);
-      Thread.sleep(externalViewCheckIntervalInMs);
-    } while (System.currentTimeMillis() < endTimeMs);
 
-    if (bestEfforts) {
-      tableRebalanceLogger.warn(
-          "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
-          externalViewStabilizationTimeoutInMs);
-      return idealState;
-    } else {
-      throw new TimeoutException(String.format("ExternalView has not converged 
within: %d ms",
-          externalViewStabilizationTimeoutInMs));
+      if (externalView == null) {
+        tableRebalanceLogger.warn("ExternalView is null, will not extend the 
EV stabilization timeout.");
+        throw new TimeoutException(
+            String.format("ExternalView is null, cannot wait for it to 
converge within %dms",
+                externalViewStabilizationTimeoutInMs));
+      }
+      int currentRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+          externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts,
+          segmentsToMonitor, tableRebalanceLogger, false);
+
+      // It is possible that remainingSegments increases so that 
currentRemainingSegments > previousRemainingSegments,
+      // likely due to CONSUMING segments committing, where the state of the 
segment change to ONLINE. Therefore, if
+      // the segment had converged, it then becomes un-converged and thus 
increases the count.
+      if (currentRemainingSegments >= previousRemainingSegments) {
+        throw new TimeoutException(
+            String.format(
+                "ExternalView has not made progress for the last %dms, timeout 
after spending %dms waiting (%d "
+                    + "extensions)", externalViewStabilizationTimeoutInMs, 
System.currentTimeMillis() - startTimeMs,
+                extensionCount));
+      }
+      tableRebalanceLogger.info(
+          "Extending EV stabilization timeout for another {}ms, remaining {} 
segments to be processed.",

Review Comment:
   let's log the extensionCount here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1300,50 +1301,91 @@ private IdealState waitForExternalViewToConverge(String 
tableNameWithType, boole
       long estimateAverageSegmentSizeInBytes, Set<String> 
allSegmentsFromIdealState,
       Logger tableRebalanceLogger)
       throws InterruptedException, TimeoutException {
-    long endTimeMs = System.currentTimeMillis() + 
externalViewStabilizationTimeoutInMs;
+    long startTimeMs = System.currentTimeMillis();
+    long endTimeMs = startTimeMs + externalViewStabilizationTimeoutInMs;
+    int extensionCount = 0;
 
     IdealState idealState;
-    do {
-      tableRebalanceLogger.debug("Start to check if ExternalView converges to 
IdealStates");
-      idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
-      // IdealState might be null if table got deleted, throwing exception to 
abort the rebalance
-      Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
-
-      ExternalView externalView =
-          
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
-      // ExternalView might be null when table is just created, skipping check 
for this iteration
-      if (externalView != null) {
-        // Record external view and ideal state convergence status
-        TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
-            estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
-        _tableRebalanceObserver.onTrigger(
-            
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
-            externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
-        // Update unique segment list as IS-EV trigger must have processed 
these
-        allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
-        if (_tableRebalanceObserver.isStopped()) {
-          throw new RuntimeException(
-              String.format("Rebalance has already stopped with status: %s", 
_tableRebalanceObserver.getStopStatus()));
-        }
-        if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
-            idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor, tableRebalanceLogger)) {
-          tableRebalanceLogger.info("ExternalView converged");
-          return idealState;
+    ExternalView externalView;
+    int previousRemainingSegments = -1;
+    tableRebalanceLogger.info("Waiting for ExternalView to converge, {} 
segments to monitor in current step",
+        segmentsToMonitor.size());
+    while (true) {
+      do {
+        tableRebalanceLogger.debug("Start to check if ExternalView converges 
to IdealStates");
+        idealState = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType));
+        // IdealState might be null if table got deleted, throwing exception 
to abort the rebalance
+        Preconditions.checkState(idealState != null, "Failed to find the 
IdealState");
+
+        externalView = 
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType));
+        // ExternalView might be null when table is just created, skipping 
check for this iteration
+        if (externalView != null) {
+          // Record external view and ideal state convergence status
+          TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+              estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, 
segmentsToMonitor);
+          _tableRebalanceObserver.onTrigger(
+              
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+              externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), rebalanceContext);
+          // Update unique segment list as IS-EV trigger must have processed 
these
+          allSegmentsFromIdealState = 
idealState.getRecord().getMapFields().keySet();
+          if (_tableRebalanceObserver.isStopped()) {
+            throw new RuntimeException(
+                String.format("Rebalance has already stopped with status: %s",
+                    _tableRebalanceObserver.getStopStatus()));
+          }
+          if (isExternalViewConverged(tableNameWithType, 
externalView.getRecord().getMapFields(),
+              idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, 
segmentsToMonitor,
+              tableRebalanceLogger)) {
+            tableRebalanceLogger.info("ExternalView converged in {}ms, with {} 
extensions",
+                System.currentTimeMillis() - startTimeMs, extensionCount);
+            return idealState;
+          }
+          if (previousRemainingSegments < 0) {
+            // initialize previousRemainingSegments
+            previousRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+                externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode,
+                bestEfforts, segmentsToMonitor, tableRebalanceLogger, false);
+            tableRebalanceLogger.info("Remaining {} segments to be 
processed.", previousRemainingSegments);
+          }
         }
+        tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
+            externalViewCheckIntervalInMs);
+        Thread.sleep(externalViewCheckIntervalInMs);
+      } while (System.currentTimeMillis() < endTimeMs);
+
+      if (bestEfforts) {
+        tableRebalanceLogger.warn(
+            "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
+            externalViewStabilizationTimeoutInMs);
+        return idealState;
       }
-      tableRebalanceLogger.debug("ExternalView has not converged to 
IdealStates. Retry after: {}ms",
-          externalViewCheckIntervalInMs);
-      Thread.sleep(externalViewCheckIntervalInMs);
-    } while (System.currentTimeMillis() < endTimeMs);
 
-    if (bestEfforts) {
-      tableRebalanceLogger.warn(
-          "ExternalView has not converged within: {}ms, continuing the 
rebalance (best-efforts)",
-          externalViewStabilizationTimeoutInMs);
-      return idealState;
-    } else {
-      throw new TimeoutException(String.format("ExternalView has not converged 
within: %d ms",
-          externalViewStabilizationTimeoutInMs));
+      if (externalView == null) {
+        tableRebalanceLogger.warn("ExternalView is null, will not extend the 
EV stabilization timeout.");
+        throw new TimeoutException(
+            String.format("ExternalView is null, cannot wait for it to 
converge within %dms",
+                externalViewStabilizationTimeoutInMs));
+      }
+      int currentRemainingSegments = 
getNumRemainingSegmentsToProcess(tableNameWithType,
+          externalView.getRecord().getMapFields(), 
idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts,
+          segmentsToMonitor, tableRebalanceLogger, false);
+
+      // It is possible that remainingSegments increases so that 
currentRemainingSegments > previousRemainingSegments,
+      // likely due to CONSUMING segments committing, where the state of the 
segment change to ONLINE. Therefore, if
+      // the segment had converged, it then becomes un-converged and thus 
increases the count.
+      if (currentRemainingSegments >= previousRemainingSegments) {
+        throw new TimeoutException(
+            String.format(
+                "ExternalView has not made progress for the last %dms, timeout 
after spending %dms waiting (%d "
+                    + "extensions)", externalViewStabilizationTimeoutInMs, 
System.currentTimeMillis() - startTimeMs,
+                extensionCount));
+      }

Review Comment:
   nit: add an empty line after this for better code readability



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -1352,26 +1394,58 @@ static boolean isExternalViewConverged(String 
tableNameWithType,
       Map<String, Map<String, String>> externalViewSegmentStates,
       Map<String, Map<String, String>> idealStateSegmentStates, boolean 
lowDiskMode, boolean bestEfforts,
       @Nullable Set<String> segmentsToMonitor) {
-    return isExternalViewConverged(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates, lowDiskMode,
-        bestEfforts, segmentsToMonitor, LOGGER);
+    return getNumRemainingSegmentsToProcess(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates,
+        lowDiskMode, bestEfforts, segmentsToMonitor, LOGGER, true) == 0;
   }
 
   /**
-   * NOTE:
-   * Only check the segments in the IdealState and being monitored. Extra 
segments in ExternalView are ignored because
-   * they are not managed by the rebalancer.
-   * For each segment checked:
-   * - In regular mode, it is okay to have extra instances in ExternalView as 
long as the instance states in IdealState
-   *   are reached.
-   * - In low disk mode, instance states in ExternalView must match IdealState 
to ensure the segments are deleted from
-   *   server before moving to the next assignment.
-   * For ERROR state in ExternalView, if using best-efforts, log a warning and 
treat it as good state; if not, throw an
-   * exception to abort the rebalance because we are not able to get out of 
the ERROR state.
+   * Check if the external view has converged to the ideal state. See 
`getNumRemainingSegmentsToProcess` for details on
+   * how the convergence is determined.
    */
   private static boolean isExternalViewConverged(String tableNameWithType,
       Map<String, Map<String, String>> externalViewSegmentStates,
       Map<String, Map<String, String>> idealStateSegmentStates, boolean 
lowDiskMode, boolean bestEfforts,
       @Nullable Set<String> segmentsToMonitor, Logger tableRebalanceLogger) {
+    return getNumRemainingSegmentsToProcess(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates,
+        lowDiskMode, bestEfforts, segmentsToMonitor, tableRebalanceLogger, 
true) == 0;
+  }
+
+  @VisibleForTesting
+  static int getNumRemainingSegmentsToProcess(String tableNameWithType,
+      Map<String, Map<String, String>> externalViewSegmentStates,
+      Map<String, Map<String, String>> idealStateSegmentStates, boolean 
lowDiskMode, boolean bestEfforts,
+      @Nullable Set<String> segmentsToMonitor) {
+    return getNumRemainingSegmentsToProcess(tableNameWithType, 
externalViewSegmentStates, idealStateSegmentStates,
+        lowDiskMode, bestEfforts, segmentsToMonitor, LOGGER, false);
+  }
+
+  /**
+   * Count the number of segments that are not in the expected state. If 
`earlyReturn=true` it returns as soon as the
+   * count becomes non-zero (effectively return 1). This is used to check 
whether the ExternalView has converged to

Review Comment:
   nit: remove "effectively return 1". Instead add a note on how it returns the 
number of segments processed so far



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to