This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 79da40671c Fix internal logic of pre-check minimizeDataMovement (#15527) 79da40671c is described below commit 79da40671c5feb04f4ddab5c2e7948b0e759805d Author: Jhow <44998515+j-howhu...@users.noreply.github.com> AuthorDate: Fri Apr 11 20:00:51 2025 -0700 Fix internal logic of pre-check minimizeDataMovement (#15527) * fix internal logic of pre-check minimizeDataMovement --- .../core/rebalance/DefaultRebalancePreChecker.java | 55 ++++++++++++++++------ 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index 62be8dff80..ea429990d0 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -130,7 +130,7 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { Map<String, JsonNode> needsReloadMetadata = needsReloadMetadataPair.getServerReloadJsonResponses(); int failedResponses = needsReloadMetadataPair.getNumFailedResponses(); LOGGER.info("Received {} needs reload responses and {} failed responses from servers for table: {} with " - + "rebalanceJobId: {}, number of servers queried: {}", needsReloadMetadata.size(), failedResponses, + + "rebalanceJobId: {}, number of servers queried: {}", needsReloadMetadata.size(), failedResponses, tableNameWithType, rebalanceJobId, currentlyAssignedServers.size()); needsReload = needsReloadMetadata.values().stream().anyMatch(value -> value.get("needReload").booleanValue()); if (!needsReload && failedResponses > 0) { @@ -183,21 +183,22 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { instanceAssignmentConfigConsuming = InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.CONSUMING); } - // For REALTIME tables need to check for both CONSUMING and COMPLETED segments if relocation is enabled + // For REALTIME tables if COMPLETED segments are not to be relocated, check for only CONSUMING segment instance + // assignment config if presents if (!InstanceAssignmentConfigUtils.shouldRelocateCompletedSegments(tableConfig)) { - RebalancePreCheckerResult rebalancePreCheckerResult; if (isInstanceAssignmentAllowedConsuming) { if (rebalanceConfig.getMinimizeDataMovement() == RebalanceConfig.MinimizeDataMovementOptions.ENABLE) { return RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); } if (instanceAssignmentConfigConsuming.isMinimizeDataMovement()) { return rebalanceConfig.getMinimizeDataMovement() == RebalanceConfig.MinimizeDataMovementOptions.DISABLE - ? RebalancePreCheckerResult.warn("minimizeDataMovement is enabled in table config but it's overridden " - + "with disabled") + ? RebalancePreCheckerResult.warn( + "minimizeDataMovement is enabled for CONSUMING segments in table config but it's overridden " + + "with disabled") : RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); } - return RebalancePreCheckerResult.warn("minimizeDataMovement is not enabled but instance assignment is " - + "allowed"); + return RebalancePreCheckerResult.warn( + "minimizeDataMovement is not enabled for CONSUMING segments but instance assignment is allowed"); } return RebalancePreCheckerResult.pass("Instance assignment not allowed, no need for minimizeDataMovement"); } @@ -210,7 +211,8 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { InstanceAssignmentConfigUtils.getInstanceAssignmentConfig(tableConfig, InstancePartitionsType.COMPLETED); } - RebalancePreCheckerResult rebalancePreCheckerResult; + // COMPLETED segments are to be relocated, check both COMPLETED and CONSUMING segment instance assignment config + // that present if (!isInstanceAssignmentAllowedConsuming && !isInstanceAssignmentAllowedCompleted) { return RebalancePreCheckerResult.pass("Instance assignment not allowed, no need for minimizeDataMovement"); } else if (instanceAssignmentConfigConsuming != null && instanceAssignmentConfigCompleted != null) { @@ -220,16 +222,41 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { if (instanceAssignmentConfigCompleted.isMinimizeDataMovement() && instanceAssignmentConfigConsuming.isMinimizeDataMovement()) { return rebalanceConfig.getMinimizeDataMovement() == RebalanceConfig.MinimizeDataMovementOptions.DISABLE - ? RebalancePreCheckerResult.warn("minimizeDataMovement is enabled in table config but it's overridden " - + "with disabled") + ? RebalancePreCheckerResult.warn( + "minimizeDataMovement is enabled for both COMPLETED and CONSUMING segments in table config but it's " + + "overridden with disabled") + : RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); + } + return RebalancePreCheckerResult.warn( + "minimizeDataMovement is not enabled for either or both COMPLETED and CONSUMING segments, but instance " + + "assignment is allowed for both"); + } else if (instanceAssignmentConfigConsuming != null) { + if (rebalanceConfig.getMinimizeDataMovement() == RebalanceConfig.MinimizeDataMovementOptions.ENABLE) { + return RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); + } + if (instanceAssignmentConfigConsuming.isMinimizeDataMovement()) { + return rebalanceConfig.getMinimizeDataMovement() == RebalanceConfig.MinimizeDataMovementOptions.DISABLE + ? RebalancePreCheckerResult.warn( + "minimizeDataMovement is enabled for CONSUMING segments in table config but it's overridden with " + + "disabled") + : RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); + } + return RebalancePreCheckerResult.warn( + "minimizeDataMovement is not enabled for CONSUMING segments, but instance assignment is allowed"); + } else { + if (rebalanceConfig.getMinimizeDataMovement() == RebalanceConfig.MinimizeDataMovementOptions.ENABLE) { + return RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); + } + if (instanceAssignmentConfigCompleted.isMinimizeDataMovement()) { + return rebalanceConfig.getMinimizeDataMovement() == RebalanceConfig.MinimizeDataMovementOptions.DISABLE + ? RebalancePreCheckerResult.warn( + "minimizeDataMovement is enabled for COMPLETED segments in table config but it's overridden " + + "with disabled") : RebalancePreCheckerResult.pass("minimizeDataMovement is enabled"); } return RebalancePreCheckerResult.warn( - "minimizeDataMovement may not be enabled for consuming or completed, but instance assigment is allowed " - + "for both"); + "minimizeDataMovement is not enabled for COMPLETED segments, but instance assignment is allowed"); } - return RebalancePreCheckerResult.warn("minimizeDataMovement may not enabled for " - + "consuming or completed but instance assignment is allowed for at least one"); } catch (IllegalStateException e) { LOGGER.warn("Error while trying to fetch instance assignment config, assuming minimizeDataMovement is false", e); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org