This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 31e94d3b18 Add a dry-run summary mode for TableRebalance which only returns a summary of the dry-run results (#15050) 31e94d3b18 is described below commit 31e94d3b180a9a0aeda033040020a8beebacf9e6 Author: Sonam Mandal <sonam.man...@startree.ai> AuthorDate: Wed Feb 19 13:37:12 2025 -0800 Add a dry-run summary mode for TableRebalance which only returns a summary of the dry-run results (#15050) * Add a dry-run summary mode for TableRebalance which only returns a summary of the dry-run results * Address review - add server level lists of added, removed, unchanged, and servers getting new segments --- .../pinot/controller/BaseControllerStarter.java | 1 + .../api/resources/PinotTableRestletResource.java | 10 +- .../helix/core/PinotHelixResourceManager.java | 18 +- .../core/rebalance/DefaultRebalancePreChecker.java | 3 +- .../helix/core/rebalance/RebalanceConfig.java | 22 +- .../helix/core/rebalance/RebalanceResult.java | 11 +- .../core/rebalance/RebalanceSummaryResult.java | 325 ++++++++++++++++++++ .../helix/core/rebalance/TableRebalancer.java | 228 ++++++++++++-- .../rebalance/tenant/DefaultTenantRebalancer.java | 5 +- .../rebalance/tenant/TenantRebalanceResult.java | 2 +- .../TableRebalancerClusterStatelessTest.java | 342 ++++++++++++++++++++- .../tests/OfflineClusterIntegrationTest.java | 208 ++++++++++++- 12 files changed, 1126 insertions(+), 49 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 5761cb27e5..9a4d080c7c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -512,6 +512,7 @@ public abstract class BaseControllerStarter implements ServiceStartable { _tableSizeReader = new TableSizeReader(_executorService, _connectionManager, _controllerMetrics, _helixResourceManager, _leadControllerManager); + _helixResourceManager.registerTableSizeReader(_tableSizeReader); _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, _controllerMetrics, _leadControllerManager, _helixResourceManager, _config); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java index 8930c2f643..8815d20d06 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java @@ -606,6 +606,9 @@ public class PinotTableRestletResource { @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr, @ApiParam(value = "Whether to rebalance table in dry-run mode") @DefaultValue("false") @QueryParam("dryRun") boolean dryRun, + @ApiParam(value = "Whether to return dry-run summary instead of full, dry-run must be enabled to use this") + @DefaultValue("false") @QueryParam("summary") + boolean summary, @ApiParam(value = "Whether to enable pre-checks for table, must be in dry-run mode to enable") @DefaultValue("false") @QueryParam("preChecks") boolean preChecks, @ApiParam(value = "Whether to reassign instances before reassigning segments") @DefaultValue("false") @@ -648,6 +651,7 @@ public class PinotTableRestletResource { String tableNameWithType = constructTableNameWithType(tableName, tableTypeStr); RebalanceConfig rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(dryRun); + rebalanceConfig.setSummary(summary); rebalanceConfig.setPreChecks(preChecks); rebalanceConfig.setReassignInstances(reassignInstances); rebalanceConfig.setIncludeConsuming(includeConsuming); @@ -668,7 +672,7 @@ public class PinotTableRestletResource { String rebalanceJobId = TableRebalancer.createUniqueRebalanceJobIdentifier(); try { - if (dryRun || preChecks || downtime) { + if (dryRun || summary || preChecks || downtime) { // For dry-run, preChecks or rebalance with downtime, directly return the rebalance result as it should return // immediately return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig, rebalanceJobId, false); @@ -689,7 +693,7 @@ public class PinotTableRestletResource { String errorMsg = String.format("Caught exception/error while rebalancing table: %s", tableNameWithType); LOGGER.error(errorMsg, t); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, - null); + null, null); } }); boolean isJobIdPersisted = waitForRebalanceToPersist( @@ -710,7 +714,7 @@ public class PinotTableRestletResource { return new RebalanceResult(dryRunResult.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller logs for updates", dryRunResult.getInstanceAssignment(), dryRunResult.getTierInstanceAssignment(), dryRunResult.getSegmentAssignment(), - dryRunResult.getPreChecksResult()); + dryRunResult.getPreChecksResult(), dryRunResult.getRebalanceSummaryResult()); } else { // If dry-run failed or is no-op, return the dry-run result return dryRunResult; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 2c31458fcd..fd36bf44ea 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -159,6 +159,7 @@ import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver; import org.apache.pinot.controller.helix.starter.HelixConfig; +import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.DatabaseConfig; import org.apache.pinot.spi.config.instance.Instance; @@ -241,6 +242,7 @@ public class PinotHelixResourceManager { private TableCache _tableCache; private final LineageManager _lineageManager; private final RebalancePreChecker _rebalancePreChecker; + private TableSizeReader _tableSizeReader; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, @@ -449,6 +451,15 @@ public class PinotHelixResourceManager { } /** + * Get the table size reader. + * + * @return table size reader + */ + public TableSizeReader getTableSizeReader() { + return _tableSizeReader; + } + +/** * Instance related APIs */ @@ -1943,6 +1954,10 @@ public class PinotHelixResourceManager { _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager; } + public void registerTableSizeReader(TableSizeReader tableSizeReader) { + _tableSizeReader = tableSizeReader; + } + private void assignInstances(TableConfig tableConfig, boolean override) { String tableNameWithType = tableConfig.getTableName(); String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); @@ -3612,7 +3627,8 @@ public class PinotHelixResourceManager { tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, tableConfig); } TableRebalancer tableRebalancer = - new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics, _rebalancePreChecker); + new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, _controllerMetrics, _rebalancePreChecker, + _tableSizeReader); return tableRebalancer.rebalance(tableConfig, rebalanceConfig, rebalanceJobId, tierToSegmentsMap); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java index 945a98e48e..b6a89208ee 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java @@ -53,8 +53,7 @@ public class DefaultRebalancePreChecker implements RebalancePreChecker { } @Override - public Map<String, String> check(String rebalanceJobId, String tableNameWithType, - TableConfig tableConfig) { + public Map<String, String> check(String rebalanceJobId, String tableNameWithType, TableConfig tableConfig) { LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", tableNameWithType, rebalanceJobId); Map<String, String> preCheckResult = new HashMap<>(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java index e5bd39f0ec..06a9e171c3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java @@ -34,6 +34,11 @@ public class RebalanceConfig { @ApiModelProperty(example = "false") private boolean _dryRun = false; + // Whether to return only dry-run summary instead of full dry-run output, can only be used in dry-run mode + @JsonProperty("summary") + @ApiModelProperty(example = "false") + private boolean _summary = false; + // Whether to perform pre-checks for rebalance. This only returns the status of each pre-check and does not fail // rebalance @JsonProperty("preChecks") @@ -124,6 +129,14 @@ public class RebalanceConfig { _dryRun = dryRun; } + public boolean isSummary() { + return _summary; + } + + public void setSummary(boolean summary) { + _summary = summary; + } + public boolean isPreChecks() { return _preChecks; } @@ -246,10 +259,10 @@ public class RebalanceConfig { @Override public String toString() { - return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + _preChecks + ", _reassignInstances=" - + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" + _bootstrap - + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + _minAvailableReplicas + ", _bestEfforts=" - + _bestEfforts + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs + return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _summary=" + _summary + ", preChecks=" + _preChecks + + ", _reassignInstances=" + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", _bootstrap=" + + _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + _minAvailableReplicas + + ", _bestEfforts=" + _bestEfforts + ", _externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs + ", _externalViewStabilizationTimeoutInMs=" + _externalViewStabilizationTimeoutInMs + ", _updateTargetTier=" + _updateTargetTier + ", _heartbeatIntervalInMs=" + _heartbeatIntervalInMs + ", _heartbeatTimeoutInMs=" + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", _retryInitialDelayInMs=" @@ -259,6 +272,7 @@ public class RebalanceConfig { public static RebalanceConfig copy(RebalanceConfig cfg) { RebalanceConfig rc = new RebalanceConfig(); rc._dryRun = cfg._dryRun; + rc._summary = cfg._summary; rc._preChecks = cfg._preChecks; rc._reassignInstances = cfg._reassignInstances; rc._includeConsuming = cfg._includeConsuming; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java index 2b81d8d78b..ac768a8c46 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java @@ -42,6 +42,8 @@ public class RebalanceResult { private final String _description; @JsonInclude(JsonInclude.Include.NON_NULL) private final Map<String, String> _preChecksResult; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceSummaryResult _rebalanceSummaryResult; @JsonCreator public RebalanceResult(@JsonProperty(value = "jobId", required = true) String jobId, @@ -50,7 +52,8 @@ public class RebalanceResult { @JsonProperty("instanceAssignment") @Nullable Map<InstancePartitionsType, InstancePartitions> instanceAssignment, @JsonProperty("tierInstanceAssignment") @Nullable Map<String, InstancePartitions> tierInstanceAssignment, @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, String>> segmentAssignment, - @JsonProperty("preChecksResult") @Nullable Map<String, String> preChecksResult) { + @JsonProperty("preChecksResult") @Nullable Map<String, String> preChecksResult, + @JsonProperty("rebalanceSummaryResult") @Nullable RebalanceSummaryResult rebalanceSummaryResult) { _jobId = jobId; _status = status; _description = description; @@ -58,6 +61,7 @@ public class RebalanceResult { _tierInstanceAssignment = tierInstanceAssignment; _segmentAssignment = segmentAssignment; _preChecksResult = preChecksResult; + _rebalanceSummaryResult = rebalanceSummaryResult; } @JsonProperty @@ -95,6 +99,11 @@ public class RebalanceResult { return _preChecksResult; } + @JsonProperty + public RebalanceSummaryResult getRebalanceSummaryResult() { + return _rebalanceSummaryResult; + } + public enum Status { // FAILED if the job has ended with known exceptions; // ABORTED if the job is stopped by others but retry is still allowed; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java new file mode 100644 index 0000000000..3169c9b367 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +/** + * Holds the summary data of the rebalance result + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class RebalanceSummaryResult { + + @JsonInclude(JsonInclude.Include.NON_NULL) + private final ServerInfo _serverInfo; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final SegmentInfo _segmentInfo; + + /** + * Constructor for RebalanceSummaryResult + * @param serverInfo server related summary information + * @param segmentInfo segment related summary information + */ + @JsonCreator + public RebalanceSummaryResult(@JsonProperty("serverInfo") @Nullable ServerInfo serverInfo, + @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo) { + _serverInfo = serverInfo; + _segmentInfo = segmentInfo; + } + + @JsonProperty + public ServerInfo getServerInfo() { + return _serverInfo; + } + + @JsonProperty + public SegmentInfo getSegmentInfo() { + return _segmentInfo; + } + + public static class ServerSegmentChangeInfo { + private final ServerStatus _serverStatus; + private final int _totalSegmentsAfterRebalance; + private final int _totalSegmentsBeforeRebalance; + private final int _segmentsAdded; + private final int _segmentsDeleted; + private final int _segmentsUnchanged; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final List<String> _tagList; + + /** + * Constructor for ServerSegmentChangeInfo + * @param serverStatus server status, whether it was added, removed, or unchanged as part of this rebalance + * @param totalSegmentsAfterRebalance expected total segments on this server after rebalance + * @param totalSegmentsBeforeRebalance current number of segments on this server before rebalance + * @param segmentsAdded number of segments expected to be added as part of this rebalance + * @param segmentsDeleted number of segments expected to be deleted as part of this rebalance + * @param segmentsUnchanged number of segments that aren't moving from this server as part of this rebalance + * @param tagList server tag list + */ + @JsonCreator + public ServerSegmentChangeInfo(@JsonProperty("serverStatus") ServerStatus serverStatus, + @JsonProperty("totalSegmentsAfterRebalance") int totalSegmentsAfterRebalance, + @JsonProperty("totalSegmentsBeforeRebalance") int totalSegmentsBeforeRebalance, + @JsonProperty("segmentsAdded") int segmentsAdded, @JsonProperty("segmentsDeleted") int segmentsDeleted, + @JsonProperty("segmentsUnchanged") int segmentsUnchanged, + @JsonProperty("tagList") @Nullable List<String> tagList) { + _serverStatus = serverStatus; + _totalSegmentsAfterRebalance = totalSegmentsAfterRebalance; + _totalSegmentsBeforeRebalance = totalSegmentsBeforeRebalance; + _segmentsAdded = segmentsAdded; + _segmentsDeleted = segmentsDeleted; + _segmentsUnchanged = segmentsUnchanged; + _tagList = tagList; + } + + @JsonProperty + public ServerStatus getServerStatus() { + return _serverStatus; + } + + @JsonProperty + public int getTotalSegmentsAfterRebalance() { + return _totalSegmentsAfterRebalance; + } + + @JsonProperty + public int getTotalSegmentsBeforeRebalance() { + return _totalSegmentsBeforeRebalance; + } + + @JsonProperty + public int getSegmentsAdded() { + return _segmentsAdded; + } + + @JsonProperty + public int getSegmentsDeleted() { + return _segmentsDeleted; + } + + @JsonProperty + public int getSegmentsUnchanged() { + return _segmentsUnchanged; + } + + @JsonProperty + public List<String> getTagList() { + return _tagList; + } + } + + public static class RebalanceChangeInfo { + private final int _valueBeforeRebalance; + private final int _expectedValueAfterRebalance; + + /** + * Constructor for RebalanceChangeInfo + * @param valueBeforeRebalance current value before rebalance + * @param expectedValueAfterRebalance expected value after rebalance + */ + @JsonCreator + public RebalanceChangeInfo(@JsonProperty("valueBeforeRebalance") int valueBeforeRebalance, + @JsonProperty("expectedValueAfterRebalance") int expectedValueAfterRebalance) { + _valueBeforeRebalance = valueBeforeRebalance; + _expectedValueAfterRebalance = expectedValueAfterRebalance; + } + + @JsonProperty + public int getValueBeforeRebalance() { + return _valueBeforeRebalance; + } + + @JsonProperty + public int getExpectedValueAfterRebalance() { + return _expectedValueAfterRebalance; + } + } + + public static class ServerInfo { + private final int _numServersGettingNewSegments; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _numServers; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversAdded; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversRemoved; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversUnchanged; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversGettingNewSegments; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Map<String, ServerSegmentChangeInfo> _serverSegmentChangeInfo; + + /** + * Constructor for ServerInfo + * @param numServersGettingNewSegments total number of servers receiving new segments as part of this rebalance + * @param numServers number of servers before and after this rebalance + * @param serversAdded set of servers getting added as part of this rebalance + * @param serversRemoved set of servers getting removed as part of this rebalance + * @param serversUnchanged set of servers existing both before and as part of this rebalance + * @param serversGettingNewSegments set of servers getting segments added + * @param serverSegmentChangeInfo per server statistics for this rebalance + */ + @JsonCreator + public ServerInfo(@JsonProperty("numServersGettingNewSegments") int numServersGettingNewSegments, + @JsonProperty("numServers") @Nullable RebalanceChangeInfo numServers, + @JsonProperty("serversAdded") @Nullable Set<String> serversAdded, + @JsonProperty("serversRemoved") @Nullable Set<String> serversRemoved, + @JsonProperty("serversUnchanged") @Nullable Set<String> serversUnchanged, + @JsonProperty("serversGettingNewSegments") @Nullable Set<String> serversGettingNewSegments, + @JsonProperty("serverSegmentChangeInfo") + @Nullable Map<String, ServerSegmentChangeInfo> serverSegmentChangeInfo) { + _numServersGettingNewSegments = numServersGettingNewSegments; + _numServers = numServers; + _serversAdded = serversAdded; + _serversRemoved = serversRemoved; + _serversUnchanged = serversUnchanged; + _serversGettingNewSegments = serversGettingNewSegments; + _serverSegmentChangeInfo = serverSegmentChangeInfo; + } + + @JsonProperty + public int getNumServersGettingNewSegments() { + return _numServersGettingNewSegments; + } + + @JsonProperty + public RebalanceChangeInfo getNumServers() { + return _numServers; + } + + @JsonProperty + public Set<String> getServersAdded() { + return _serversAdded; + } + + @JsonProperty + public Set<String> getServersRemoved() { + return _serversRemoved; + } + + @JsonProperty + public Set<String> getServersUnchanged() { + return _serversUnchanged; + } + + @JsonProperty + public Set<String> getServersGettingNewSegments() { + return _serversGettingNewSegments; + } + + @JsonProperty + public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { + return _serverSegmentChangeInfo; + } + } + + public static class SegmentInfo { + // TODO: Add a metric to estimate the total time it will take to rebalance + private final int _totalSegmentsToBeMoved; + private final int _maxSegmentsAddedToASingleServer; + private final long _estimatedAverageSegmentSizeInBytes; + private final long _totalEstimatedDataToBeMovedInBytes; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _replicationFactor; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _numSegmentsInSingleReplica; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas; + + /** + * Constructor for SegmentInfo + * @param totalSegmentsToBeMoved total number of segments to be moved as part of this rebalance + * @param maxSegmentsAddedToASingleServer maximum segments added to a single server as part of this rebalance + * @param estimatedAverageSegmentSizeInBytes estimated average size of segments in bytes + * @param totalEstimatedDataToBeMovedInBytes total estimated amount of data to be moved as part of this rebalance + * @param replicationFactor replication factor before and after this rebalance + * @param numSegmentsInSingleReplica number of segments in single replica before and after this rebalance + * @param numSegmentsAcrossAllReplicas total number of segments across all replicas before and after this rebalance + */ + @JsonCreator + public SegmentInfo(@JsonProperty("totalSegmentsToBeMoved") int totalSegmentsToBeMoved, + @JsonProperty("maxSegmentsAddedToASingleServer") int maxSegmentsAddedToASingleServer, + @JsonProperty("estimatedAverageSegmentSizeInBytes") long estimatedAverageSegmentSizeInBytes, + @JsonProperty("totalEstimatedDataToBeMovedInBytes") long totalEstimatedDataToBeMovedInBytes, + @JsonProperty("replicationFactor") @Nullable RebalanceChangeInfo replicationFactor, + @JsonProperty("numSegmentsInSingleReplica") @Nullable RebalanceChangeInfo numSegmentsInSingleReplica, + @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable RebalanceChangeInfo numSegmentsAcrossAllReplicas) { + _totalSegmentsToBeMoved = totalSegmentsToBeMoved; + _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer; + _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes; + _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes; + _replicationFactor = replicationFactor; + _numSegmentsInSingleReplica = numSegmentsInSingleReplica; + _numSegmentsAcrossAllReplicas = numSegmentsAcrossAllReplicas; + } + + @JsonProperty + public int getTotalSegmentsToBeMoved() { + return _totalSegmentsToBeMoved; + } + + @JsonProperty + public int getMaxSegmentsAddedToASingleServer() { + return _maxSegmentsAddedToASingleServer; + } + + @JsonProperty + public long getEstimatedAverageSegmentSizeInBytes() { + return _estimatedAverageSegmentSizeInBytes; + } + + @JsonProperty + public long getTotalEstimatedDataToBeMovedInBytes() { + return _totalEstimatedDataToBeMovedInBytes; + } + + @JsonProperty + public RebalanceChangeInfo getReplicationFactor() { + return _replicationFactor; + } + + @JsonProperty + public RebalanceChangeInfo getNumSegmentsInSingleReplica() { + return _numSegmentsInSingleReplica; + } + + @JsonProperty + public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() { + return _numSegmentsAcrossAllReplicas; + } + } + + public enum ServerStatus { + // ADDED if the server is newly added as part of rebalance; + // REMOVED if the server is removed as part of rebalance; + // UNCHANGED if the server status is unchanged as part of rebalance; + ADDED, REMOVED, UNCHANGED + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 275994e57a..b5d56f2ba6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -43,11 +43,13 @@ import org.apache.helix.HelixManager; import org.apache.helix.PropertyKey; import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; +import org.apache.pinot.common.exception.InvalidConfigException; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.metrics.ControllerTimer; import org.apache.pinot.common.tier.PinotServerTierStorage; @@ -60,6 +62,7 @@ import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment; +import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.RoutingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -120,9 +123,11 @@ public class TableRebalancer { private final TableRebalanceObserver _tableRebalanceObserver; private final ControllerMetrics _controllerMetrics; private final RebalancePreChecker _rebalancePreChecker; + private final TableSizeReader _tableSizeReader; public TableRebalancer(HelixManager helixManager, @Nullable TableRebalanceObserver tableRebalanceObserver, - @Nullable ControllerMetrics controllerMetrics, @Nullable RebalancePreChecker rebalancePreChecker) { + @Nullable ControllerMetrics controllerMetrics, @Nullable RebalancePreChecker rebalancePreChecker, + @Nullable TableSizeReader tableSizeReader) { _helixManager = helixManager; if (tableRebalanceObserver != null) { _tableRebalanceObserver = tableRebalanceObserver; @@ -132,10 +137,11 @@ public class TableRebalancer { _helixDataAccessor = helixManager.getHelixDataAccessor(); _controllerMetrics = controllerMetrics; _rebalancePreChecker = rebalancePreChecker; + _tableSizeReader = tableSizeReader; } public TableRebalancer(HelixManager helixManager) { - this(helixManager, null, null, null); + this(helixManager, null, null, null, null); } public static String createUniqueRebalanceJobIdentifier() { @@ -175,6 +181,7 @@ public class TableRebalancer { rebalanceJobId = createUniqueRebalanceJobIdentifier(); } boolean dryRun = rebalanceConfig.isDryRun(); + boolean summary = rebalanceConfig.isSummary(); boolean preChecks = rebalanceConfig.isPreChecks(); boolean reassignInstances = rebalanceConfig.isReassignInstances(); boolean includeConsuming = rebalanceConfig.isIncludeConsuming(); @@ -189,14 +196,19 @@ public class TableRebalancer { && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase( tableConfig.getRoutingConfig().getInstanceSelectorType()); LOGGER.info( - "Start rebalancing table: {} with dryRun: {}, preChecks: {}, reassignInstances: {}, includeConsuming: {}, " - + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, enableStrictReplicaGroup: {}, " - + "lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, " + "Start rebalancing table: {} with dryRun: {}, summary: {}, preChecks: {}, reassignInstances: {}, " + + "includeConsuming: {}, bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: {}, " + + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, externalViewCheckIntervalInMs: {}, " + "externalViewStabilizationTimeoutInMs: {}", - tableNameWithType, dryRun, preChecks, reassignInstances, includeConsuming, bootstrap, downtime, + tableNameWithType, dryRun, summary, preChecks, reassignInstances, includeConsuming, bootstrap, downtime, minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, lowDiskMode, bestEfforts, externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs); + if (summary && !dryRun) { + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, + "Must enable dry-run mode to use summary mode", null, null, null, null, null); + } + // Perform pre-checks if enabled Map<String, String> preChecksResult = null; if (preChecks) { @@ -205,7 +217,8 @@ public class TableRebalancer { String errorMsg = String.format("Pre-checks can only be enabled in dry-run mode, not triggering rebalance for " + "table: %s with rebalanceJobId: %s", tableNameWithType, rebalanceJobId); LOGGER.error(errorMsg); - return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, null); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, errorMsg, null, null, null, null, + null); } if (_rebalancePreChecker != null) { preChecksResult = _rebalancePreChecker.check(rebalanceJobId, tableNameWithType, tableConfig); @@ -222,21 +235,21 @@ public class TableRebalancer { "For rebalanceId: %s, caught exception while fetching IdealState for table: %s, aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, - "Caught exception while fetching IdealState: " + e, null, null, null, preChecksResult); + "Caught exception while fetching IdealState: " + e, null, null, null, preChecksResult, null); } if (currentIdealState == null) { onReturnFailure( String.format("For rebalanceId: %s, cannot find the IdealState for table: %s, aborting the rebalance", rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Cannot find the IdealState for table", - null, null, null, preChecksResult); + null, null, null, preChecksResult, null); } if (!currentIdealState.isEnabled() && !downtime) { onReturnFailure(String.format( "For rebalanceId: %s, cannot rebalance disabled table: %s without downtime, aborting the rebalance", rebalanceJobId, tableNameWithType), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, - "Cannot rebalance disabled table without downtime", null, null, null, preChecksResult); + "Cannot rebalance disabled table without downtime", null, null, null, preChecksResult, null); } LOGGER.info("For rebalanceId: {}, processing instance partitions for table: {}", rebalanceJobId, tableNameWithType); @@ -254,7 +267,8 @@ public class TableRebalancer { "For rebalanceId: %s, caught exception while fetching/calculating instance partitions for table: %s, " + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, - "Caught exception while fetching/calculating instance partitions: " + e, null, null, null, preChecksResult); + "Caught exception while fetching/calculating instance partitions: " + e, null, null, null, preChecksResult, + null); } // Calculate instance partitions for tiers if configured @@ -273,7 +287,7 @@ public class TableRebalancer { + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while fetching/calculating tier instance partitions: " + e, null, null, null, - preChecksResult); + preChecksResult, null); } LOGGER.info("For rebalanceId: {}, calculating the target assignment for table: {}", rebalanceJobId, @@ -291,7 +305,7 @@ public class TableRebalancer { + "rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while calculating target assignment: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, null, preChecksResult); + tierToInstancePartitionsMap, null, preChecksResult, null); } boolean segmentAssignmentUnchanged = currentAssignment.equals(targetAssignment); @@ -299,6 +313,13 @@ public class TableRebalancer { + "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId, instancePartitionsUnchanged, tierInstancePartitionsUnchanged, segmentAssignmentUnchanged, tableNameWithType); + RebalanceSummaryResult summaryResult = null; + if (summary) { + // Calculate summary here itself so that even if the table is already balanced, the caller can verify whether that + // is expected or not based on the summary results + summaryResult = calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, rebalanceJobId); + } + if (segmentAssignmentUnchanged) { LOGGER.info("Table: {} is already balanced", tableNameWithType); if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) { @@ -306,28 +327,38 @@ public class TableRebalancer { String.format("For rebalanceId: %s, instance unchanged and table: %s is already balanced", rebalanceJobId, tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.NO_OP, "Table is already balanced", - instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult); + summary ? null : instancePartitionsMap, summary ? null : tierToInstancePartitionsMap, + summary ? null : targetAssignment, preChecksResult, summaryResult); } else { if (dryRun) { return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, - "Instance reassigned in dry-run mode, table is already balanced", instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment, preChecksResult); + "Instance reassigned in dry-run mode, table is already balanced", + summary ? null : instancePartitionsMap, summary ? null : tierToInstancePartitionsMap, + summary ? null : targetAssignment, preChecksResult, summaryResult); } else { _tableRebalanceObserver.onSuccess( String.format("For rebalanceId: %s, instance reassigned but table: %s is already balanced", rebalanceJobId, tableNameWithType)); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, - "Instance reassigned, table is already balanced", instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment, preChecksResult); + "Instance reassigned, table is already balanced", summary ? null : instancePartitionsMap, + summary ? null : tierToInstancePartitionsMap, summary ? null : targetAssignment, preChecksResult, + summaryResult); } } } + if (summary) { + LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run summary mode, returning the summary only", + rebalanceJobId, tableNameWithType); + return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Dry-run summary mode", null, + null, null, preChecksResult, summaryResult); + } + if (dryRun) { LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run mode, returning the target assignment", rebalanceJobId, tableNameWithType); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Dry-run mode", instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment, preChecksResult); + tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } if (downtime) { @@ -352,14 +383,14 @@ public class TableRebalancer { return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Success with downtime (replaced IdealState with the target segment assignment, ExternalView might not " + "reach the target segment assignment yet)", instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment, preChecksResult); + targetAssignment, preChecksResult, summaryResult); } catch (Exception e) { onReturnFailure(String.format( "For rebalanceId: %s, caught exception while updating IdealState for table: %s, aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment, preChecksResult); + targetAssignment, preChecksResult, summaryResult); } } @@ -391,7 +422,7 @@ public class TableRebalancer { minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas), null); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Illegal min available replicas config", instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment, preChecksResult); + targetAssignment, preChecksResult, summaryResult); } minAvailableReplicas = minReplicasToKeepUpForNoDowntime; } else { @@ -432,12 +463,12 @@ public class TableRebalancer { if (_tableRebalanceObserver.isStopped()) { return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(), "Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment, preChecksResult); + tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } _tableRebalanceObserver.onError(errorMsg); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while waiting for ExternalView to converge: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment, preChecksResult); + tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } // Re-calculate the target assignment if IdealState changed while waiting for ExternalView to converge @@ -486,7 +517,7 @@ public class TableRebalancer { + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while re-calculating the target assignment: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment, preChecksResult); + tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } } else { LOGGER.info("For rebalanceId:{}, no state change found for segments to be moved, " @@ -510,7 +541,7 @@ public class TableRebalancer { return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, "Success with minAvailableReplicas: " + minAvailableReplicas + " (both IdealState and ExternalView should reach the target segment assignment)", - instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult); + instancePartitionsMap, tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } // Record change of current ideal state and the new target @@ -519,7 +550,7 @@ public class TableRebalancer { if (_tableRebalanceObserver.isStopped()) { return new RebalanceResult(rebalanceJobId, _tableRebalanceObserver.getStopStatus(), "Rebalance has stopped already before updating the IdealState", instancePartitionsMap, - tierToInstancePartitionsMap, targetAssignment, preChecksResult); + tierToInstancePartitionsMap, targetAssignment, preChecksResult, summaryResult); } Map<String, Map<String, String>> nextAssignment = getNextAssignment(currentAssignment, targetAssignment, minAvailableReplicas, enableStrictReplicaGroup, @@ -550,7 +581,7 @@ public class TableRebalancer { + "aborting the rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while updating IdealState: " + e, instancePartitionsMap, tierToInstancePartitionsMap, - targetAssignment, preChecksResult); + targetAssignment, preChecksResult, summaryResult); } segmentsToMonitor = new HashSet<>(segmentsToMove); @@ -559,6 +590,147 @@ public class TableRebalancer { } } + private long calculateTableSizePerReplicaInBytes(String tableNameWithType) { + long tableSizePerReplicaInBytes = -1; + if (_tableSizeReader == null) { + LOGGER.warn("tableSizeReader is null, cannot calculate table size for table {}!", tableNameWithType); + return tableSizePerReplicaInBytes; + } + LOGGER.info("Fetching the table size for rebalance summary for table: {}", tableNameWithType); + try { + // TODO: Consider making the timeoutMs for fetching table size via table rebalancer configurable + TableSizeReader.TableSubTypeSizeDetails tableSizeDetails = + _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000); + tableSizePerReplicaInBytes = tableSizeDetails._reportedSizePerReplicaInBytes; + } catch (InvalidConfigException e) { + LOGGER.error("Caught exception while trying to fetch table size details for table: {}", tableNameWithType, e); + } + LOGGER.info("Fetched the table size (per replica size: {}) for rebalance summary for table: {}", + tableSizePerReplicaInBytes, tableNameWithType); + return tableSizePerReplicaInBytes; + } + + private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, String tableNameWithType, String rebalanceJobId) { + LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", + tableNameWithType, rebalanceJobId); + int existingReplicationFactor = 0; + int newReplicationFactor = 0; + Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + + for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { + existingReplicationFactor = entrySet.getValue().size(); + for (String segmentKey : entrySet.getValue().keySet()) { + existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } + } + + for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { + newReplicationFactor = entrySet.getValue().size(); + for (String segmentKey : entrySet.getValue().keySet()) { + newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } + } + RebalanceSummaryResult.RebalanceChangeInfo replicationFactor + = new RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, newReplicationFactor); + + int existingNumServers = existingServersToSegmentMap.size(); + int newNumServers = newServersToSegmentMap.size(); + RebalanceSummaryResult.RebalanceChangeInfo numServers + = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, newNumServers); + + List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues( + _helixDataAccessor.keyBuilder().instanceConfigs(), true); + Map<String, List<String>> instanceToTagsMap = new HashMap<>(); + for (InstanceConfig instanceConfig : instanceConfigs) { + instanceToTagsMap.put(instanceConfig.getInstanceName(), instanceConfig.getTags()); + } + + Set<String> serversAdded = new HashSet<>(); + Set<String> serversRemoved = new HashSet<>(); + Set<String> serversUnchanged = new HashSet<>(); + Set<String> serversGettingNewSegments = new HashSet<>(); + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = new HashMap<>(); + int segmentsNotMoved = 0; + int maxSegmentsAddedToServer = 0; + for (Map.Entry<String, Set<String>> entry : newServersToSegmentMap.entrySet()) { + String server = entry.getKey(); + Set<String> segmentSet = entry.getValue(); + int totalNewSegments = segmentSet.size(); + + Set<String> newSegmentSet = new HashSet<>(segmentSet); + Set<String> existingSegmentSet = new HashSet<>(); + int segmentsUnchanged = 0; + int totalExistingSegments = 0; + RebalanceSummaryResult.ServerStatus serverStatus = RebalanceSummaryResult.ServerStatus.ADDED; + if (existingServersToSegmentMap.containsKey(server)) { + Set<String> segmentSetForServer = existingServersToSegmentMap.get(server); + totalExistingSegments = segmentSetForServer.size(); + existingSegmentSet.addAll(segmentSetForServer); + Set<String> intersection = new HashSet<>(segmentSetForServer); + intersection.retainAll(newSegmentSet); + segmentsUnchanged = intersection.size(); + segmentsNotMoved += segmentsUnchanged; + serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED; + serversUnchanged.add(server); + } else { + serversAdded.add(server); + } + newSegmentSet.removeAll(existingSegmentSet); + int segmentsAdded = newSegmentSet.size(); + if (segmentsAdded > 0) { + serversGettingNewSegments.add(server); + } + maxSegmentsAddedToServer = Math.max(maxSegmentsAddedToServer, segmentsAdded); + int segmentsDeleted = existingSegmentSet.size() - segmentsUnchanged; + + serverSegmentChangeInfoMap.put(server, new RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus, + totalNewSegments, totalExistingSegments, segmentsAdded, segmentsDeleted, segmentsUnchanged, + instanceToTagsMap.getOrDefault(server, null))); + } + + for (Map.Entry<String, Set<String>> entry : existingServersToSegmentMap.entrySet()) { + String server = entry.getKey(); + if (!serverSegmentChangeInfoMap.containsKey(server)) { + serversRemoved.add(server); + serverSegmentChangeInfoMap.put(server, new RebalanceSummaryResult.ServerSegmentChangeInfo( + RebalanceSummaryResult.ServerStatus.REMOVED, 0, entry.getValue().size(), 0, entry.getValue().size(), 0, + instanceToTagsMap.getOrDefault(server, null))); + } + } + + RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica + = new RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(), targetAssignment.size()); + + int existingNumberSegmentsTotal = existingReplicationFactor * currentAssignment.size(); + int newNumberSegmentsTotal = newReplicationFactor * targetAssignment.size(); + RebalanceSummaryResult.RebalanceChangeInfo numSegmentsAcrossAllReplicas + = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumberSegmentsTotal, newNumberSegmentsTotal); + + int totalSegmentsToBeMoved = newNumberSegmentsTotal - segmentsNotMoved; + + long tableSizePerReplicaInBytes = calculateTableSizePerReplicaInBytes(tableNameWithType); + long averageSegmentSizeInBytes = tableSizePerReplicaInBytes <= 0 ? tableSizePerReplicaInBytes + : tableSizePerReplicaInBytes / ((long) currentAssignment.size()); + long totalEstimatedDataToBeMovedInBytes = tableSizePerReplicaInBytes <= 0 ? tableSizePerReplicaInBytes + : ((long) totalSegmentsToBeMoved) * averageSegmentSizeInBytes; + + // Set some of the sets to null if they are empty to ensure they don't show up in the result + RebalanceSummaryResult.ServerInfo serverInfo = new RebalanceSummaryResult.ServerInfo( + serversGettingNewSegments.size(), numServers, serversAdded, serversRemoved, serversUnchanged, + serversGettingNewSegments, serverSegmentChangeInfoMap); + // TODO: Add a metric to estimate the total time it will take to rebalance. Need some good heuristics on how + // rebalance time can vary with number of segments added + RebalanceSummaryResult.SegmentInfo segmentInfo = new RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved, + maxSegmentsAddedToServer, averageSegmentSizeInBytes, totalEstimatedDataToBeMovedInBytes, + replicationFactor, numSegmentsInSingleReplica, numSegmentsAcrossAllReplicas); + + LOGGER.info("Calculated rebalance summary for table: {} with rebalanceJobId: {}", tableNameWithType, + rebalanceJobId); + return new RebalanceSummaryResult(serverInfo, segmentInfo); + } + private void onReturnFailure(String errorMsg, Exception e) { if (e != null) { LOGGER.warn(errorMsg, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java index e115476529..661fff70d9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java @@ -60,7 +60,7 @@ public class DefaultTenantRebalancer implements TenantRebalancer { false)); } catch (TableNotFoundException exception) { rebalanceResult.put(table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), - null, null, null, null)); + null, null, null, null, null)); } }); if (config.isDryRun()) { @@ -71,7 +71,8 @@ public class DefaultTenantRebalancer implements TenantRebalancer { if (result.getStatus() == RebalanceResult.Status.DONE) { rebalanceResult.put(table, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller task status for the", result.getInstanceAssignment(), - result.getTierInstanceAssignment(), result.getSegmentAssignment(), result.getPreChecksResult())); + result.getTierInstanceAssignment(), result.getSegmentAssignment(), result.getPreChecksResult(), + result.getRebalanceSummaryResult())); } } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java index c96c25b250..ed0caf0f29 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java @@ -36,7 +36,7 @@ public class TenantRebalanceResult { _rebalanceTableResults = new HashMap<>(); rebalanceTableResults.forEach((table, result) -> { _rebalanceTableResults.put(table, new RebalanceResult(result.getJobId(), result.getStatus(), - result.getDescription(), null, null, null, null)); + result.getDescription(), null, null, null, null, null)); }); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java index de83ec6eee..2f7ab350bc 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Map; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.assignment.InstancePartitions; @@ -91,15 +92,26 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + i, true); } + ExecutorService executorService = Executors.newFixedThreadPool(10); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); - preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10)); - TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker); + preChecker.init(_helixResourceManager, executorService); + TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, null, preChecker, + _helixResourceManager.getTableSizeReader()); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build(); // Rebalance should fail without creating the table RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + + // Rebalance with dry-run summary should fail without creating the table + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + assertNull(rebalanceResult.getRebalanceSummaryResult()); // Create the table addDummySchema(RAW_TABLE_NAME); @@ -114,6 +126,27 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { Map<String, Map<String, String>> oldSegmentAssignment = _helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(); + // Rebalance with dry-run summary should return NO_OP status + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + + // Dry-run mode should not change the IdealState + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + oldSegmentAssignment); + // Rebalance should return NO_OP status rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); @@ -137,8 +170,54 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + (numServers + i), true); } + // Rebalance in dry-run summary mode with added servers + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 14); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); + + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = + rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo(); + assertNotNull(serverSegmentChangeInfoMap); + for (int i = 0; i < numServers; i++) { + // Original servers should be losing some segments + String newServer = SERVER_INSTANCE_ID_PREFIX + i; + RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); + assertTrue(serverSegmentChange.getSegmentsDeleted() > 0); + assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); + assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); + assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0); + assertEquals(serverSegmentChange.getSegmentsAdded(), 0); + } + for (int i = 0; i < numServersToAdd; i++) { + // New servers should only get new segments + String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i); + RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); + assertTrue(serverSegmentChange.getSegmentsAdded() > 0); + assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0); + assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), serverSegmentChange.getSegmentsAdded()); + assertEquals(serverSegmentChange.getSegmentsDeleted(), 0); + assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0); + } + + // Dry-run mode should not change the IdealState + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + oldSegmentAssignment); + // Rebalance in dry-run mode - RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDryRun(true); rebalanceConfig.setPreChecks(true); rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); @@ -186,6 +265,26 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), oldSegmentAssignment); + // Rebalance dry-run summary with 3 min available replicas should not be impacted since actual rebalance does not + // occur + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceConfig.setPreChecks(true); + rebalanceConfig.setMinAvailableReplicas(3); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertNotNull(rebalanceResult.getPreChecksResult()); + // Rebalance with 3 min available replicas should fail as the table only have 3 replicas rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setMinAvailableReplicas(3); @@ -218,6 +317,41 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); _helixResourceManager.updateTableConfig(tableConfig); + // Try dry-run summary mode + // No need to reassign instances because instances should be automatically assigned when updating the table config + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 11); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + + serverSegmentChangeInfoMap = rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo(); + assertNotNull(serverSegmentChangeInfoMap); + for (int i = 0; i < numServers + numServersToAdd; i++) { + String newServer = SERVER_INSTANCE_ID_PREFIX + i; + RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = serverSegmentChangeInfoMap.get(newServer); + assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5); + // Ensure not all segments moved + assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0); + // Ensure all segments has something assigned prior to rebalance + assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0); + } + + // Dry-run mode should not change the IdealState + assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(), + newSegmentAssignment); + + // Try actual rebalance // No need to reassign instances because instances should be automatically assigned when updating the table config rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); @@ -264,6 +398,25 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { tableConfig.setInstanceAssignmentConfigMap(null); _helixResourceManager.updateTableConfig(tableConfig); + // Try dry-run summary mode without reassignment to ensure that existing instance partitions are used + // no movement should occur + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + // Without instances reassignment, the rebalance should return status NO_OP, and the existing instance partitions // should be used rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); @@ -271,6 +424,26 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment); assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment); + // Try dry-run summary mode with reassignment + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceConfig.setReassignInstances(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + // No move expected since already balanced + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + // With instances reassignment, the rebalance should return status DONE, the existing instance partitions should be // removed, and the default instance partitions should be used rebalanceConfig = new RebalanceConfig(); @@ -300,6 +473,25 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { TagNameUtils.getOfflineTagForTenant(null)); } + // Try dry-run summary mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceConfig.setDowntime(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 15); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 3); + // Rebalance with downtime should succeed rebalanceConfig = new RebalanceConfig(); rebalanceConfig.setDowntime(true); @@ -327,7 +519,18 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { } } + // Try summary mode without dry-run set + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME); + executorService.shutdown(); } /** @@ -367,7 +570,27 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { _helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields(); TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); - RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); + + // Try dry-run summary mode + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + + // Run actual table rebalance + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // Segment assignment should not change assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); @@ -382,6 +605,24 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { } _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, TIER_B_NAME, 3, 3, 0)); + // Try dry-run summary mode, should be no-op + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + // rebalance is NOOP and no change in assignment caused by new instances rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); @@ -400,6 +641,24 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { TierFactory.PINOT_SERVER_STORAGE_TYPE, NO_TIER_NAME + "_OFFLINE", null, null))); _helixResourceManager.updateTableConfig(tableConfig); + // Try dry-run summary mode, some moves should occur + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 15); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 9); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 6); + // rebalance should change assignment rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); @@ -458,7 +717,26 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { _helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields(); TableRebalancer tableRebalancer = new TableRebalancer(_helixManager); - RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); + + // Try dry-run summary mode + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + RebalanceSummaryResult rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); // Segment assignment should not change assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment); @@ -469,6 +747,25 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX + i, false); } _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, "replicaAssignment" + TIER_A_NAME, 6, 6, 0)); + + // Try dry-run summary mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceResult.getRebalanceSummaryResult()); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 0); + // rebalance is NOOP and no change in assignment caused by new instances rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP); @@ -481,6 +778,24 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { TierFactory.PINOT_SERVER_STORAGE_TYPE, "replicaAssignment" + TIER_A_NAME + "_OFFLINE", null, null))); _helixResourceManager.updateTableConfig(tableConfig); + // Try dry-run summary mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceResult.getRebalanceSummaryResult()); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 30); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 3); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(), 6); + // rebalance should change assignment rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); @@ -504,6 +819,23 @@ public class TableRebalancerClusterStatelessTest extends ControllerTest { new InstanceAssignmentConfig(tagPoolConfig, null, replicaGroupPartitionConfig, null, false))); _helixResourceManager.updateTableConfig(tableConfig); + // Try dry-run summary mode + rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(rebalanceSummaryResult); + assertNotNull(rebalanceSummaryResult.getServerInfo()); + assertNotNull(rebalanceSummaryResult.getSegmentInfo()); + assertNull(rebalanceResult.getInstanceAssignment()); + assertNull(rebalanceResult.getTierInstanceAssignment()); + assertNull(rebalanceResult.getSegmentAssignment()); + assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 13); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), 6); + assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), 6); + rebalanceResult = tableRebalancer.rebalance(tableConfig, new RebalanceConfig(), null); assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME)); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 6750a7340d..7b8e4168a7 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; import java.sql.ResultSet; import java.sql.Statement; import java.sql.Timestamp; @@ -40,6 +42,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -64,10 +67,12 @@ import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.SimpleHttpResponse; import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker; import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; @@ -176,6 +181,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private PinotHelixResourceManager _resourceManager; private TableRebalancer _tableRebalancer; + private ExecutorService _executorService; protected int getNumBrokers() { return NUM_BROKERS; @@ -287,8 +293,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet _resourceManager = _controllerStarter.getHelixResourceManager(); DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker(); - preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10)); - _tableRebalancer = new TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker); + _executorService = Executors.newFixedThreadPool(10); + preChecker.init(_helixResourceManager, _executorService); + _tableRebalancer = new TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker, + _resourceManager.getTableSizeReader()); } private void reloadAllSegments(String testQuery, boolean forceDownload, long numTotalDocs) @@ -3062,6 +3070,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet stopController(); stopZk(); FileUtils.deleteDirectory(_tempDir); + _executorService.shutdown(); } private void testInstanceDecommission() @@ -4018,4 +4027,199 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(result.get("clientRequestId").asText(), clientRequestId); } + + @Test + public void testRebalanceDryRunSummary() + throws Exception { + // setup the rebalance config + RebalanceConfig rebalanceConfig = new RebalanceConfig(); + rebalanceConfig.setDryRun(true); + + TableConfig tableConfig = getOfflineTableConfig(); + + // Ensure summary status is null if not enabled + RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + + // Enable summary, nothing is set + rebalanceConfig.setSummary(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP, false, getNumServers(), getNumServers(), + tableConfig.getReplication()); + + // Add a new server (to force change in instance assignment) and enable reassignInstances + BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS); + rebalanceConfig.setReassignInstances(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.DONE, true, getNumServers(), + getNumServers() + 1, tableConfig.getReplication()); + + // Disable dry-run, summary still enabled + rebalanceConfig.setDryRun(false); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED); + + // Disable summary along with dry-run to do a real rebalance + rebalanceConfig.setSummary(false); + rebalanceConfig.setDowntime(true); + rebalanceConfig.setReassignInstances(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + + waitForRebalanceToComplete(rebalanceResult, 600_000L); + + // Untag the added server + _resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "", false); + + // Re-enable dry-run + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.DONE, true, getNumServers() + 1, + getNumServers(), tableConfig.getReplication()); + + // Disable dry-run and summary to do a real rebalance + rebalanceConfig.setDryRun(false); + rebalanceConfig.setSummary(false); + rebalanceConfig.setDowntime(true); + rebalanceConfig.setReassignInstances(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + assertNull(rebalanceResult.getRebalanceSummaryResult()); + assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE); + + waitForRebalanceToComplete(rebalanceResult, 600_000L); + + // Stop the server + serverStarter1.stop(); + TestUtils.waitForCondition(aVoid -> _resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful(), + 60_000L, "Failed to drop added server"); + + // Try dry-run with summary again + rebalanceConfig.setDryRun(true); + rebalanceConfig.setSummary(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP, false, getNumServers(), getNumServers(), + tableConfig.getReplication()); + + // Try dry-run with summary and pre-checks + rebalanceConfig.setPreChecks(true); + rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, null); + checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP, false, getNumServers(), getNumServers(), + tableConfig.getReplication()); + assertNotNull(rebalanceResult.getPreChecksResult()); + assertTrue(rebalanceResult.getPreChecksResult().containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS)); + assertTrue(rebalanceResult.getPreChecksResult().containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT)); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS), "false"); + assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT), + "false"); + } + + private void checkRebalanceDryRunSummary(RebalanceResult rebalanceResult, RebalanceResult.Status expectedStatus, + boolean isSegmentsToBeMoved, int existingNumServers, int newNumServers, int replicationFactor) { + assertEquals(rebalanceResult.getStatus(), expectedStatus); + RebalanceSummaryResult summaryResult = rebalanceResult.getRebalanceSummaryResult(); + assertNotNull(summaryResult); + assertNotNull(summaryResult.getServerInfo()); + assertNotNull(summaryResult.getSegmentInfo()); + assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(), replicationFactor, + "Existing replication factor doesn't match expected"); + assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(), + summaryResult.getSegmentInfo().getReplicationFactor().getExpectedValueAfterRebalance(), + "Existing and new replication factor doesn't match"); + assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), existingNumServers, + "Existing number of servers don't match"); + assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), newNumServers, + "New number of servers don't match"); + if (_tableSize > 0) { + assertTrue(summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes() > 0L, + "Avg segment size expected to be > 0 but found to be 0"); + } + assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(), + summaryResult.getServerInfo().getServersGettingNewSegments().size()); + if (existingNumServers != newNumServers) { + assertTrue(summaryResult.getServerInfo().getNumServersGettingNewSegments() > 0, + "Expected number of servers should be > 0"); + } else { + assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(), 0, + "Expected number of servers getting new segments should be 0"); + } + + if (isSegmentsToBeMoved) { + assertTrue(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved() > 0, + "Segments to be moved should be > 0"); + assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(), + summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved() + * summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes(), + "Estimated data to be moved in bytes doesn't match"); + assertTrue(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer() > 0, + "Estimated max number of segments to move in a single server should be > 0"); + } else { + assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 0, "Segments to be moved should be 0"); + assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(), 0L, + "Estimated data to be moved in bytes should be 0"); + assertEquals(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer(), 0, + "Estimated max number of segments to move in a single server should be 0"); + } + + // Validate server status stats with numServers information + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = + summaryResult.getServerInfo().getServerSegmentChangeInfo(); + int numServersAdded = 0; + int numServersRemoved = 0; + int numServersUnchanged = 0; + for (RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChangeInfo : serverSegmentChangeInfoMap.values()) { + switch (serverSegmentChangeInfo.getServerStatus()) { + case UNCHANGED: + numServersUnchanged++; + break; + case ADDED: + numServersAdded++; + break; + case REMOVED: + numServersRemoved++; + break; + default: + Assert.fail(String.format("Unknown server status encountered: %s", + serverSegmentChangeInfo.getServerStatus())); + break; + } + } + + Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(), + numServersRemoved + numServersUnchanged); + Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(), + numServersAdded + numServersUnchanged); + + assertEquals(numServersAdded, summaryResult.getServerInfo().getServersAdded().size()); + assertEquals(numServersRemoved, summaryResult.getServerInfo().getServersRemoved().size()); + assertEquals(numServersUnchanged, summaryResult.getServerInfo().getServersUnchanged().size()); + } + + protected void waitForRebalanceToComplete(RebalanceResult rebalanceResult, long timeoutMs) { + String jobId = rebalanceResult.getJobId(); + if (rebalanceResult.getStatus() != RebalanceResult.Status.IN_PROGRESS) { + return; + } + + TestUtils.waitForCondition(aVoid -> { + try { + String requestUrl = getControllerRequestURLBuilder().forTableRebalanceStatus(jobId); + try { + SimpleHttpResponse httpResponse = + HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new URL(requestUrl).toURI(), null)); + + ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = + JsonUtils.stringToObject(httpResponse.getResponse(), ServerRebalanceJobStatusResponse.class); + RebalanceResult.Status status = serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus(); + return status != RebalanceResult.Status.IN_PROGRESS; + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } catch (Exception e) { + return null; + } + }, 1000L, timeoutMs, "Failed to load all segments after rebalance"); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org