somandal commented on code in PR #15618: URL: https://github.com/apache/pinot/pull/15618#discussion_r2076083881
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -1300,50 +1301,91 @@ private IdealState waitForExternalViewToConverge(String tableNameWithType, boole long estimateAverageSegmentSizeInBytes, Set<String> allSegmentsFromIdealState, Logger tableRebalanceLogger) throws InterruptedException, TimeoutException { - long endTimeMs = System.currentTimeMillis() + externalViewStabilizationTimeoutInMs; + long startTimeMs = System.currentTimeMillis(); + long endTimeMs = startTimeMs + externalViewStabilizationTimeoutInMs; + int extensionCount = 0; IdealState idealState; - do { - tableRebalanceLogger.debug("Start to check if ExternalView converges to IdealStates"); - idealState = _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType)); - // IdealState might be null if table got deleted, throwing exception to abort the rebalance - Preconditions.checkState(idealState != null, "Failed to find the IdealState"); - - ExternalView externalView = - _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType)); - // ExternalView might be null when table is just created, skipping check for this iteration - if (externalView != null) { - // Record external view and ideal state convergence status - TableRebalanceObserver.RebalanceContext rebalanceContext = new TableRebalanceObserver.RebalanceContext( - estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, segmentsToMonitor); - _tableRebalanceObserver.onTrigger( - TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, - externalView.getRecord().getMapFields(), idealState.getRecord().getMapFields(), rebalanceContext); - // Update unique segment list as IS-EV trigger must have processed these - allSegmentsFromIdealState = idealState.getRecord().getMapFields().keySet(); - if (_tableRebalanceObserver.isStopped()) { - throw new RuntimeException( - String.format("Rebalance has already stopped with status: %s", _tableRebalanceObserver.getStopStatus())); - } - if (isExternalViewConverged(tableNameWithType, externalView.getRecord().getMapFields(), - idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, segmentsToMonitor, tableRebalanceLogger)) { - tableRebalanceLogger.info("ExternalView converged"); - return idealState; + ExternalView externalView; + int previousRemainingSegments = -1; + tableRebalanceLogger.info("Waiting for ExternalView to converge, {} segments to monitor in current step", + segmentsToMonitor.size()); + while (true) { + do { + tableRebalanceLogger.debug("Start to check if ExternalView converges to IdealStates"); + idealState = _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().idealStates(tableNameWithType)); + // IdealState might be null if table got deleted, throwing exception to abort the rebalance + Preconditions.checkState(idealState != null, "Failed to find the IdealState"); + + externalView = _helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().externalView(tableNameWithType)); + // ExternalView might be null when table is just created, skipping check for this iteration + if (externalView != null) { + // Record external view and ideal state convergence status + TableRebalanceObserver.RebalanceContext rebalanceContext = new TableRebalanceObserver.RebalanceContext( + estimateAverageSegmentSizeInBytes, allSegmentsFromIdealState, segmentsToMonitor); + _tableRebalanceObserver.onTrigger( + TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, + externalView.getRecord().getMapFields(), idealState.getRecord().getMapFields(), rebalanceContext); + // Update unique segment list as IS-EV trigger must have processed these + allSegmentsFromIdealState = idealState.getRecord().getMapFields().keySet(); + if (_tableRebalanceObserver.isStopped()) { + throw new RuntimeException( + String.format("Rebalance has already stopped with status: %s", + _tableRebalanceObserver.getStopStatus())); + } + if (isExternalViewConverged(tableNameWithType, externalView.getRecord().getMapFields(), + idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, segmentsToMonitor, + tableRebalanceLogger)) { + tableRebalanceLogger.info("ExternalView converged in {}ms, with {} extensions", + System.currentTimeMillis() - startTimeMs, extensionCount); + return idealState; + } + if (previousRemainingSegments < 0) { + // initialize previousRemainingSegments + previousRemainingSegments = getNumRemainingSegmentsToProcess(tableNameWithType, + externalView.getRecord().getMapFields(), idealState.getRecord().getMapFields(), lowDiskMode, + bestEfforts, segmentsToMonitor, tableRebalanceLogger, false); + tableRebalanceLogger.info("Remaining {} segments to be processed.", previousRemainingSegments); + } } + tableRebalanceLogger.debug("ExternalView has not converged to IdealStates. Retry after: {}ms", + externalViewCheckIntervalInMs); + Thread.sleep(externalViewCheckIntervalInMs); + } while (System.currentTimeMillis() < endTimeMs); + + if (bestEfforts) { + tableRebalanceLogger.warn( + "ExternalView has not converged within: {}ms, continuing the rebalance (best-efforts)", + externalViewStabilizationTimeoutInMs); + return idealState; } - tableRebalanceLogger.debug("ExternalView has not converged to IdealStates. Retry after: {}ms", - externalViewCheckIntervalInMs); - Thread.sleep(externalViewCheckIntervalInMs); - } while (System.currentTimeMillis() < endTimeMs); - if (bestEfforts) { - tableRebalanceLogger.warn( - "ExternalView has not converged within: {}ms, continuing the rebalance (best-efforts)", - externalViewStabilizationTimeoutInMs); - return idealState; - } else { - throw new TimeoutException(String.format("ExternalView has not converged within: %d ms", - externalViewStabilizationTimeoutInMs)); + if (externalView == null) { + tableRebalanceLogger.warn("ExternalView is null, will not extend the EV stabilization timeout."); + throw new TimeoutException( + String.format("ExternalView is null, cannot wait for it to converge within %dms", + externalViewStabilizationTimeoutInMs)); + } + int currentRemainingSegments = getNumRemainingSegmentsToProcess(tableNameWithType, + externalView.getRecord().getMapFields(), idealState.getRecord().getMapFields(), lowDiskMode, bestEfforts, + segmentsToMonitor, tableRebalanceLogger, false); + + // It is possible that remainingSegments increases so that currentRemainingSegments > previousRemainingSegments, + // likely due to CONSUMING segments committing, where the state of the segment change to ONLINE. Therefore, if + // the segment had converged, it then becomes un-converged and thus increases the count. + if (currentRemainingSegments >= previousRemainingSegments) { + throw new TimeoutException( + String.format( + "ExternalView has not made progress for the last %dms, timeout after spending %dms waiting (%d " + + "extensions)", externalViewStabilizationTimeoutInMs, System.currentTimeMillis() - startTimeMs, + extensionCount)); + } + tableRebalanceLogger.info( + "Extending EV stabilization timeout for another {}ms, remaining {} segments to be processed.", Review Comment: won't it be useful for debugging purposes to see which iteration we are on for extension? easier to see sequence number than find all logs and count? (assuming we haven't completed yet and don't know total number of iterations) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org