This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 31e94d3b18 Add a dry-run summary mode for TableRebalance which only 
returns a summary of the dry-run results (#15050)
31e94d3b18 is described below

commit 31e94d3b180a9a0aeda033040020a8beebacf9e6
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Wed Feb 19 13:37:12 2025 -0800

    Add a dry-run summary mode for TableRebalance which only returns a summary 
of the dry-run results (#15050)
    
    * Add a dry-run summary mode for TableRebalance which only returns a 
summary of the dry-run results
    
    * Address review - add server level lists of added, removed, unchanged, and 
servers getting new segments
---
 .../pinot/controller/BaseControllerStarter.java    |   1 +
 .../api/resources/PinotTableRestletResource.java   |  10 +-
 .../helix/core/PinotHelixResourceManager.java      |  18 +-
 .../core/rebalance/DefaultRebalancePreChecker.java |   3 +-
 .../helix/core/rebalance/RebalanceConfig.java      |  22 +-
 .../helix/core/rebalance/RebalanceResult.java      |  11 +-
 .../core/rebalance/RebalanceSummaryResult.java     | 325 ++++++++++++++++++++
 .../helix/core/rebalance/TableRebalancer.java      | 228 ++++++++++++--
 .../rebalance/tenant/DefaultTenantRebalancer.java  |   5 +-
 .../rebalance/tenant/TenantRebalanceResult.java    |   2 +-
 .../TableRebalancerClusterStatelessTest.java       | 342 ++++++++++++++++++++-
 .../tests/OfflineClusterIntegrationTest.java       | 208 ++++++++++++-
 12 files changed, 1126 insertions(+), 49 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 5761cb27e5..9a4d080c7c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -512,6 +512,7 @@ public abstract class BaseControllerStarter implements 
ServiceStartable {
     _tableSizeReader =
         new TableSizeReader(_executorService, _connectionManager, 
_controllerMetrics, _helixResourceManager,
             _leadControllerManager);
+    _helixResourceManager.registerTableSizeReader(_tableSizeReader);
     _storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader, 
_controllerMetrics, _leadControllerManager,
         _helixResourceManager, _config);
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 8930c2f643..8815d20d06 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -606,6 +606,9 @@ public class PinotTableRestletResource {
       @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
       @ApiParam(value = "Whether to rebalance table in dry-run mode") 
@DefaultValue("false") @QueryParam("dryRun")
       boolean dryRun,
+      @ApiParam(value = "Whether to return dry-run summary instead of full, 
dry-run must be enabled to use this")
+      @DefaultValue("false") @QueryParam("summary")
+      boolean summary,
       @ApiParam(value = "Whether to enable pre-checks for table, must be in 
dry-run mode to enable")
       @DefaultValue("false") @QueryParam("preChecks") boolean preChecks,
       @ApiParam(value = "Whether to reassign instances before reassigning 
segments") @DefaultValue("false")
@@ -648,6 +651,7 @@ public class PinotTableRestletResource {
     String tableNameWithType = constructTableNameWithType(tableName, 
tableTypeStr);
     RebalanceConfig rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setDryRun(dryRun);
+    rebalanceConfig.setSummary(summary);
     rebalanceConfig.setPreChecks(preChecks);
     rebalanceConfig.setReassignInstances(reassignInstances);
     rebalanceConfig.setIncludeConsuming(includeConsuming);
@@ -668,7 +672,7 @@ public class PinotTableRestletResource {
     String rebalanceJobId = 
TableRebalancer.createUniqueRebalanceJobIdentifier();
 
     try {
-      if (dryRun || preChecks || downtime) {
+      if (dryRun || summary || preChecks || downtime) {
         // For dry-run, preChecks or rebalance with downtime, directly return 
the rebalance result as it should return
         // immediately
         return _pinotHelixResourceManager.rebalanceTable(tableNameWithType, 
rebalanceConfig, rebalanceJobId, false);
@@ -689,7 +693,7 @@ public class PinotTableRestletResource {
               String errorMsg = String.format("Caught exception/error while 
rebalancing table: %s", tableNameWithType);
               LOGGER.error(errorMsg, t);
               return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null,
-                  null);
+                  null, null);
             }
           });
           boolean isJobIdPersisted = waitForRebalanceToPersist(
@@ -710,7 +714,7 @@ public class PinotTableRestletResource {
           return new RebalanceResult(dryRunResult.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
               "In progress, check controller logs for updates", 
dryRunResult.getInstanceAssignment(),
               dryRunResult.getTierInstanceAssignment(), 
dryRunResult.getSegmentAssignment(),
-              dryRunResult.getPreChecksResult());
+              dryRunResult.getPreChecksResult(), 
dryRunResult.getRebalanceSummaryResult());
         } else {
           // If dry-run failed or is no-op, return the dry-run result
           return dryRunResult;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 2c31458fcd..fd36bf44ea 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -159,6 +159,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import 
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
 import org.apache.pinot.controller.helix.starter.HelixConfig;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.DatabaseConfig;
 import org.apache.pinot.spi.config.instance.Instance;
@@ -241,6 +242,7 @@ public class PinotHelixResourceManager {
   private TableCache _tableCache;
   private final LineageManager _lineageManager;
   private final RebalancePreChecker _rebalancePreChecker;
+  private TableSizeReader _tableSizeReader;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       boolean isSingleTenantCluster, boolean enableBatchMessageMode, int 
deletedSegmentsRetentionInDays,
@@ -449,6 +451,15 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Get the table size reader.
+   *
+   * @return table size reader
+   */
+  public TableSizeReader getTableSizeReader() {
+    return _tableSizeReader;
+  }
+
+/**
    * Instance related APIs
    */
 
@@ -1943,6 +1954,10 @@ public class PinotHelixResourceManager {
     _pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
   }
 
+  public void registerTableSizeReader(TableSizeReader tableSizeReader) {
+    _tableSizeReader = tableSizeReader;
+  }
+
   private void assignInstances(TableConfig tableConfig, boolean override) {
     String tableNameWithType = tableConfig.getTableName();
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -3612,7 +3627,8 @@ public class PinotHelixResourceManager {
       tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType, 
tableConfig);
     }
     TableRebalancer tableRebalancer =
-        new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, 
_controllerMetrics, _rebalancePreChecker);
+        new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver, 
_controllerMetrics, _rebalancePreChecker,
+            _tableSizeReader);
     return tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
rebalanceJobId, tierToSegmentsMap);
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index 945a98e48e..b6a89208ee 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -53,8 +53,7 @@ public class DefaultRebalancePreChecker implements 
RebalancePreChecker {
   }
 
   @Override
-  public Map<String, String> check(String rebalanceJobId, String 
tableNameWithType,
-      TableConfig tableConfig) {
+  public Map<String, String> check(String rebalanceJobId, String 
tableNameWithType, TableConfig tableConfig) {
     LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}", 
tableNameWithType, rebalanceJobId);
 
     Map<String, String> preCheckResult = new HashMap<>();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index e5bd39f0ec..06a9e171c3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -34,6 +34,11 @@ public class RebalanceConfig {
   @ApiModelProperty(example = "false")
   private boolean _dryRun = false;
 
+  // Whether to return only dry-run summary instead of full dry-run output, 
can only be used in dry-run mode
+  @JsonProperty("summary")
+  @ApiModelProperty(example = "false")
+  private boolean _summary = false;
+
   // Whether to perform pre-checks for rebalance. This only returns the status 
of each pre-check and does not fail
   // rebalance
   @JsonProperty("preChecks")
@@ -124,6 +129,14 @@ public class RebalanceConfig {
     _dryRun = dryRun;
   }
 
+  public boolean isSummary() {
+    return _summary;
+  }
+
+  public void setSummary(boolean summary) {
+    _summary = summary;
+  }
+
   public boolean isPreChecks() {
     return _preChecks;
   }
@@ -246,10 +259,10 @@ public class RebalanceConfig {
 
   @Override
   public String toString() {
-    return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" + 
_preChecks + ", _reassignInstances="
-        + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ", 
_bootstrap=" + _bootstrap
-        + ", _downtime=" + _downtime + ", _minAvailableReplicas=" + 
_minAvailableReplicas + ", _bestEfforts="
-        + _bestEfforts + ", _externalViewCheckIntervalInMs=" + 
_externalViewCheckIntervalInMs
+    return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _summary=" + 
_summary + ", preChecks=" + _preChecks
+        + ", _reassignInstances=" + _reassignInstances + ", 
_includeConsuming=" + _includeConsuming + ", _bootstrap="
+        + _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas=" 
+ _minAvailableReplicas
+        + ", _bestEfforts=" + _bestEfforts + ", 
_externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
         + ", _externalViewStabilizationTimeoutInMs=" + 
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
         + _updateTargetTier + ", _heartbeatIntervalInMs=" + 
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
         + _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ", 
_retryInitialDelayInMs="
@@ -259,6 +272,7 @@ public class RebalanceConfig {
   public static RebalanceConfig copy(RebalanceConfig cfg) {
     RebalanceConfig rc = new RebalanceConfig();
     rc._dryRun = cfg._dryRun;
+    rc._summary = cfg._summary;
     rc._preChecks = cfg._preChecks;
     rc._reassignInstances = cfg._reassignInstances;
     rc._includeConsuming = cfg._includeConsuming;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index 2b81d8d78b..ac768a8c46 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -42,6 +42,8 @@ public class RebalanceResult {
   private final String _description;
   @JsonInclude(JsonInclude.Include.NON_NULL)
   private final Map<String, String> _preChecksResult;
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final RebalanceSummaryResult _rebalanceSummaryResult;
 
   @JsonCreator
   public RebalanceResult(@JsonProperty(value = "jobId", required = true) 
String jobId,
@@ -50,7 +52,8 @@ public class RebalanceResult {
       @JsonProperty("instanceAssignment") @Nullable 
Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
       @JsonProperty("tierInstanceAssignment") @Nullable Map<String, 
InstancePartitions> tierInstanceAssignment,
       @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, 
String>> segmentAssignment,
-      @JsonProperty("preChecksResult") @Nullable Map<String, String> 
preChecksResult) {
+      @JsonProperty("preChecksResult") @Nullable Map<String, String> 
preChecksResult,
+      @JsonProperty("rebalanceSummaryResult") @Nullable RebalanceSummaryResult 
rebalanceSummaryResult) {
     _jobId = jobId;
     _status = status;
     _description = description;
@@ -58,6 +61,7 @@ public class RebalanceResult {
     _tierInstanceAssignment = tierInstanceAssignment;
     _segmentAssignment = segmentAssignment;
     _preChecksResult = preChecksResult;
+    _rebalanceSummaryResult = rebalanceSummaryResult;
   }
 
   @JsonProperty
@@ -95,6 +99,11 @@ public class RebalanceResult {
     return _preChecksResult;
   }
 
+  @JsonProperty
+  public RebalanceSummaryResult getRebalanceSummaryResult() {
+    return _rebalanceSummaryResult;
+  }
+
   public enum Status {
     // FAILED if the job has ended with known exceptions;
     // ABORTED if the job is stopped by others but retry is still allowed;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
new file mode 100644
index 0000000000..3169c9b367
--- /dev/null
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * Holds the summary data of the rebalance result
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class RebalanceSummaryResult {
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final ServerInfo _serverInfo;
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  private final SegmentInfo _segmentInfo;
+
+  /**
+   * Constructor for RebalanceSummaryResult
+   * @param serverInfo server related summary information
+   * @param segmentInfo segment related summary information
+   */
+  @JsonCreator
+  public RebalanceSummaryResult(@JsonProperty("serverInfo") @Nullable 
ServerInfo serverInfo,
+      @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo) {
+    _serverInfo = serverInfo;
+    _segmentInfo = segmentInfo;
+  }
+
+  @JsonProperty
+  public ServerInfo getServerInfo() {
+    return _serverInfo;
+  }
+
+  @JsonProperty
+  public SegmentInfo getSegmentInfo() {
+    return _segmentInfo;
+  }
+
+  public static class ServerSegmentChangeInfo {
+    private final ServerStatus _serverStatus;
+    private final int _totalSegmentsAfterRebalance;
+    private final int _totalSegmentsBeforeRebalance;
+    private final int _segmentsAdded;
+    private final int _segmentsDeleted;
+    private final int _segmentsUnchanged;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final List<String> _tagList;
+
+    /**
+     * Constructor for ServerSegmentChangeInfo
+     * @param serverStatus server status, whether it was added, removed, or 
unchanged as part of this rebalance
+     * @param totalSegmentsAfterRebalance expected total segments on this 
server after rebalance
+     * @param totalSegmentsBeforeRebalance current number of segments on this 
server before rebalance
+     * @param segmentsAdded number of segments expected to be added as part of 
this rebalance
+     * @param segmentsDeleted number of segments expected to be deleted as 
part of this rebalance
+     * @param segmentsUnchanged number of segments that aren't moving from 
this server as part of this rebalance
+     * @param tagList server tag list
+     */
+    @JsonCreator
+    public ServerSegmentChangeInfo(@JsonProperty("serverStatus") ServerStatus 
serverStatus,
+        @JsonProperty("totalSegmentsAfterRebalance") int 
totalSegmentsAfterRebalance,
+        @JsonProperty("totalSegmentsBeforeRebalance") int 
totalSegmentsBeforeRebalance,
+        @JsonProperty("segmentsAdded") int segmentsAdded, 
@JsonProperty("segmentsDeleted") int segmentsDeleted,
+        @JsonProperty("segmentsUnchanged") int segmentsUnchanged,
+        @JsonProperty("tagList") @Nullable List<String> tagList) {
+      _serverStatus = serverStatus;
+      _totalSegmentsAfterRebalance = totalSegmentsAfterRebalance;
+      _totalSegmentsBeforeRebalance = totalSegmentsBeforeRebalance;
+      _segmentsAdded = segmentsAdded;
+      _segmentsDeleted = segmentsDeleted;
+      _segmentsUnchanged = segmentsUnchanged;
+      _tagList = tagList;
+    }
+
+    @JsonProperty
+    public ServerStatus getServerStatus() {
+      return _serverStatus;
+    }
+
+    @JsonProperty
+    public int getTotalSegmentsAfterRebalance() {
+      return _totalSegmentsAfterRebalance;
+    }
+
+    @JsonProperty
+    public int getTotalSegmentsBeforeRebalance() {
+      return _totalSegmentsBeforeRebalance;
+    }
+
+    @JsonProperty
+    public int getSegmentsAdded() {
+      return _segmentsAdded;
+    }
+
+    @JsonProperty
+    public int getSegmentsDeleted() {
+      return _segmentsDeleted;
+    }
+
+    @JsonProperty
+    public int getSegmentsUnchanged() {
+      return _segmentsUnchanged;
+    }
+
+    @JsonProperty
+    public List<String> getTagList() {
+      return _tagList;
+    }
+  }
+
+  public static class RebalanceChangeInfo {
+    private final int _valueBeforeRebalance;
+    private final int _expectedValueAfterRebalance;
+
+    /**
+     * Constructor for RebalanceChangeInfo
+     * @param valueBeforeRebalance current value before rebalance
+     * @param expectedValueAfterRebalance expected value after rebalance
+     */
+    @JsonCreator
+    public RebalanceChangeInfo(@JsonProperty("valueBeforeRebalance") int 
valueBeforeRebalance,
+        @JsonProperty("expectedValueAfterRebalance") int 
expectedValueAfterRebalance) {
+      _valueBeforeRebalance = valueBeforeRebalance;
+      _expectedValueAfterRebalance = expectedValueAfterRebalance;
+    }
+
+    @JsonProperty
+    public int getValueBeforeRebalance() {
+      return _valueBeforeRebalance;
+    }
+
+    @JsonProperty
+    public int getExpectedValueAfterRebalance() {
+      return _expectedValueAfterRebalance;
+    }
+  }
+
+  public static class ServerInfo {
+    private final int _numServersGettingNewSegments;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _numServers;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversAdded;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversRemoved;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversUnchanged;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversGettingNewSegments;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Map<String, ServerSegmentChangeInfo> 
_serverSegmentChangeInfo;
+
+    /**
+     * Constructor for ServerInfo
+     * @param numServersGettingNewSegments total number of servers receiving 
new segments as part of this rebalance
+     * @param numServers number of servers before and after this rebalance
+     * @param serversAdded set of servers getting added as part of this 
rebalance
+     * @param serversRemoved set of servers getting removed as part of this 
rebalance
+     * @param serversUnchanged set of servers existing both before and as part 
of this rebalance
+     * @param serversGettingNewSegments set of servers getting segments added
+     * @param serverSegmentChangeInfo per server statistics for this rebalance
+     */
+    @JsonCreator
+    public ServerInfo(@JsonProperty("numServersGettingNewSegments") int 
numServersGettingNewSegments,
+        @JsonProperty("numServers") @Nullable RebalanceChangeInfo numServers,
+        @JsonProperty("serversAdded") @Nullable Set<String> serversAdded,
+        @JsonProperty("serversRemoved") @Nullable Set<String> serversRemoved,
+        @JsonProperty("serversUnchanged") @Nullable Set<String> 
serversUnchanged,
+        @JsonProperty("serversGettingNewSegments") @Nullable Set<String> 
serversGettingNewSegments,
+        @JsonProperty("serverSegmentChangeInfo")
+        @Nullable Map<String, ServerSegmentChangeInfo> 
serverSegmentChangeInfo) {
+      _numServersGettingNewSegments = numServersGettingNewSegments;
+      _numServers = numServers;
+      _serversAdded = serversAdded;
+      _serversRemoved = serversRemoved;
+      _serversUnchanged = serversUnchanged;
+      _serversGettingNewSegments = serversGettingNewSegments;
+      _serverSegmentChangeInfo = serverSegmentChangeInfo;
+    }
+
+    @JsonProperty
+    public int getNumServersGettingNewSegments() {
+      return _numServersGettingNewSegments;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getNumServers() {
+      return _numServers;
+    }
+
+    @JsonProperty
+    public Set<String> getServersAdded() {
+      return _serversAdded;
+    }
+
+    @JsonProperty
+    public Set<String> getServersRemoved() {
+      return _serversRemoved;
+    }
+
+    @JsonProperty
+    public Set<String> getServersUnchanged() {
+      return _serversUnchanged;
+    }
+
+    @JsonProperty
+    public Set<String> getServersGettingNewSegments() {
+      return _serversGettingNewSegments;
+    }
+
+    @JsonProperty
+    public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() {
+      return _serverSegmentChangeInfo;
+    }
+  }
+
+  public static class SegmentInfo {
+    // TODO: Add a metric to estimate the total time it will take to rebalance
+    private final int _totalSegmentsToBeMoved;
+    private final int _maxSegmentsAddedToASingleServer;
+    private final long _estimatedAverageSegmentSizeInBytes;
+    private final long _totalEstimatedDataToBeMovedInBytes;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _replicationFactor;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _numSegmentsInSingleReplica;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas;
+
+    /**
+     * Constructor for SegmentInfo
+     * @param totalSegmentsToBeMoved total number of segments to be moved as 
part of this rebalance
+     * @param maxSegmentsAddedToASingleServer maximum segments added to a 
single server as part of this rebalance
+     * @param estimatedAverageSegmentSizeInBytes estimated average size of 
segments in bytes
+     * @param totalEstimatedDataToBeMovedInBytes total estimated amount of 
data to be moved as part of this rebalance
+     * @param replicationFactor replication factor before and after this 
rebalance
+     * @param numSegmentsInSingleReplica number of segments in single replica 
before and after this rebalance
+     * @param numSegmentsAcrossAllReplicas total number of segments across all 
replicas before and after this rebalance
+     */
+    @JsonCreator
+    public SegmentInfo(@JsonProperty("totalSegmentsToBeMoved") int 
totalSegmentsToBeMoved,
+        @JsonProperty("maxSegmentsAddedToASingleServer") int 
maxSegmentsAddedToASingleServer,
+        @JsonProperty("estimatedAverageSegmentSizeInBytes") long 
estimatedAverageSegmentSizeInBytes,
+        @JsonProperty("totalEstimatedDataToBeMovedInBytes") long 
totalEstimatedDataToBeMovedInBytes,
+        @JsonProperty("replicationFactor") @Nullable RebalanceChangeInfo 
replicationFactor,
+        @JsonProperty("numSegmentsInSingleReplica") @Nullable 
RebalanceChangeInfo numSegmentsInSingleReplica,
+        @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable 
RebalanceChangeInfo numSegmentsAcrossAllReplicas) {
+      _totalSegmentsToBeMoved = totalSegmentsToBeMoved;
+      _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer;
+      _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes;
+      _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes;
+      _replicationFactor = replicationFactor;
+      _numSegmentsInSingleReplica = numSegmentsInSingleReplica;
+      _numSegmentsAcrossAllReplicas = numSegmentsAcrossAllReplicas;
+    }
+
+    @JsonProperty
+    public int getTotalSegmentsToBeMoved() {
+      return _totalSegmentsToBeMoved;
+    }
+
+    @JsonProperty
+    public int getMaxSegmentsAddedToASingleServer() {
+      return _maxSegmentsAddedToASingleServer;
+    }
+
+    @JsonProperty
+    public long getEstimatedAverageSegmentSizeInBytes() {
+      return _estimatedAverageSegmentSizeInBytes;
+    }
+
+    @JsonProperty
+    public long getTotalEstimatedDataToBeMovedInBytes() {
+      return _totalEstimatedDataToBeMovedInBytes;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getReplicationFactor() {
+      return _replicationFactor;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getNumSegmentsInSingleReplica() {
+      return _numSegmentsInSingleReplica;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() {
+      return _numSegmentsAcrossAllReplicas;
+    }
+  }
+
+  public enum ServerStatus {
+    // ADDED if the server is newly added as part of rebalance;
+    // REMOVED if the server is removed as part of rebalance;
+    // UNCHANGED if the server status is unchanged as part of rebalance;
+    ADDED, REMOVED, UNCHANGED
+  }
+}
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 275994e57a..b5d56f2ba6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -43,11 +43,13 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
 import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.exception.InvalidConfigException;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ControllerTimer;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
@@ -60,6 +62,7 @@ import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment;
+import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.RoutingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -120,9 +123,11 @@ public class TableRebalancer {
   private final TableRebalanceObserver _tableRebalanceObserver;
   private final ControllerMetrics _controllerMetrics;
   private final RebalancePreChecker _rebalancePreChecker;
+  private final TableSizeReader _tableSizeReader;
 
   public TableRebalancer(HelixManager helixManager, @Nullable 
TableRebalanceObserver tableRebalanceObserver,
-      @Nullable ControllerMetrics controllerMetrics, @Nullable 
RebalancePreChecker rebalancePreChecker) {
+      @Nullable ControllerMetrics controllerMetrics, @Nullable 
RebalancePreChecker rebalancePreChecker,
+      @Nullable TableSizeReader tableSizeReader) {
     _helixManager = helixManager;
     if (tableRebalanceObserver != null) {
       _tableRebalanceObserver = tableRebalanceObserver;
@@ -132,10 +137,11 @@ public class TableRebalancer {
     _helixDataAccessor = helixManager.getHelixDataAccessor();
     _controllerMetrics = controllerMetrics;
     _rebalancePreChecker = rebalancePreChecker;
+    _tableSizeReader = tableSizeReader;
   }
 
   public TableRebalancer(HelixManager helixManager) {
-    this(helixManager, null, null, null);
+    this(helixManager, null, null, null, null);
   }
 
   public static String createUniqueRebalanceJobIdentifier() {
@@ -175,6 +181,7 @@ public class TableRebalancer {
       rebalanceJobId = createUniqueRebalanceJobIdentifier();
     }
     boolean dryRun = rebalanceConfig.isDryRun();
+    boolean summary = rebalanceConfig.isSummary();
     boolean preChecks = rebalanceConfig.isPreChecks();
     boolean reassignInstances = rebalanceConfig.isReassignInstances();
     boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
@@ -189,14 +196,19 @@ public class TableRebalancer {
         && 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
         tableConfig.getRoutingConfig().getInstanceSelectorType());
     LOGGER.info(
-        "Start rebalancing table: {} with dryRun: {}, preChecks: {}, 
reassignInstances: {}, includeConsuming: {}, "
-            + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime: 
{}, enableStrictReplicaGroup: {}, "
-            + "lowDiskMode: {}, bestEfforts: {}, 
externalViewCheckIntervalInMs: {}, "
+        "Start rebalancing table: {} with dryRun: {}, summary: {}, preChecks: 
{}, reassignInstances: {}, "
+            + "includeConsuming: {}, bootstrap: {}, downtime: {}, 
minReplicasToKeepUpForNoDowntime: {}, "
+            + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {}, 
externalViewCheckIntervalInMs: {}, "
             + "externalViewStabilizationTimeoutInMs: {}",
-        tableNameWithType, dryRun, preChecks, reassignInstances, 
includeConsuming, bootstrap, downtime,
+        tableNameWithType, dryRun, summary, preChecks, reassignInstances, 
includeConsuming, bootstrap, downtime,
         minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup, 
lowDiskMode, bestEfforts,
         externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
 
+    if (summary && !dryRun) {
+      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
+          "Must enable dry-run mode to use summary mode", null, null, null, 
null, null);
+    }
+
     // Perform pre-checks if enabled
     Map<String, String> preChecksResult = null;
     if (preChecks) {
@@ -205,7 +217,8 @@ public class TableRebalancer {
         String errorMsg = String.format("Pre-checks can only be enabled in 
dry-run mode, not triggering rebalance for "
             + "table: %s with rebalanceJobId: %s", tableNameWithType, 
rebalanceJobId);
         LOGGER.error(errorMsg);
-        return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null);
+        return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null,
+            null);
       }
       if (_rebalancePreChecker != null) {
         preChecksResult = _rebalancePreChecker.check(rebalanceJobId, 
tableNameWithType, tableConfig);
@@ -222,21 +235,21 @@ public class TableRebalancer {
           "For rebalanceId: %s, caught exception while fetching IdealState for 
table: %s, aborting the rebalance",
           rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
-          "Caught exception while fetching IdealState: " + e, null, null, 
null, preChecksResult);
+          "Caught exception while fetching IdealState: " + e, null, null, 
null, preChecksResult, null);
     }
     if (currentIdealState == null) {
       onReturnFailure(
           String.format("For rebalanceId: %s, cannot find the IdealState for 
table: %s, aborting the rebalance",
               rebalanceJobId, tableNameWithType), null);
       return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
-          null, null, null, preChecksResult);
+          null, null, null, preChecksResult, null);
     }
     if (!currentIdealState.isEnabled() && !downtime) {
       onReturnFailure(String.format(
           "For rebalanceId: %s, cannot rebalance disabled table: %s without 
downtime, aborting the rebalance",
           rebalanceJobId, tableNameWithType), null);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
-          "Cannot rebalance disabled table without downtime", null, null, 
null, preChecksResult);
+          "Cannot rebalance disabled table without downtime", null, null, 
null, preChecksResult, null);
     }
 
     LOGGER.info("For rebalanceId: {}, processing instance partitions for 
table: {}", rebalanceJobId, tableNameWithType);
@@ -254,7 +267,8 @@ public class TableRebalancer {
           "For rebalanceId: %s, caught exception while fetching/calculating 
instance partitions for table: %s, "
               + "aborting the rebalance", rebalanceJobId, tableNameWithType), 
e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
-          "Caught exception while fetching/calculating instance partitions: " 
+ e, null, null, null, preChecksResult);
+          "Caught exception while fetching/calculating instance partitions: " 
+ e, null, null, null, preChecksResult,
+          null);
     }
 
     // Calculate instance partitions for tiers if configured
@@ -273,7 +287,7 @@ public class TableRebalancer {
               + "aborting the rebalance", rebalanceJobId, tableNameWithType), 
e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while fetching/calculating tier instance 
partitions: " + e, null, null, null,
-          preChecksResult);
+          preChecksResult, null);
     }
 
     LOGGER.info("For rebalanceId: {}, calculating the target assignment for 
table: {}", rebalanceJobId,
@@ -291,7 +305,7 @@ public class TableRebalancer {
               + "rebalance", rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
-          tierToInstancePartitionsMap, null, preChecksResult);
+          tierToInstancePartitionsMap, null, preChecksResult, null);
     }
 
     boolean segmentAssignmentUnchanged = 
currentAssignment.equals(targetAssignment);
@@ -299,6 +313,13 @@ public class TableRebalancer {
             + "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId, 
instancePartitionsUnchanged,
         tierInstancePartitionsUnchanged, segmentAssignmentUnchanged, 
tableNameWithType);
 
+    RebalanceSummaryResult summaryResult = null;
+    if (summary) {
+      // Calculate summary here itself so that even if the table is already 
balanced, the caller can verify whether that
+      // is expected or not based on the summary results
+      summaryResult = calculateDryRunSummary(currentAssignment, 
targetAssignment, tableNameWithType, rebalanceJobId);
+    }
+
     if (segmentAssignmentUnchanged) {
       LOGGER.info("Table: {} is already balanced", tableNameWithType);
       if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
@@ -306,28 +327,38 @@ public class TableRebalancer {
             String.format("For rebalanceId: %s, instance unchanged and table: 
%s is already balanced", rebalanceJobId,
                 tableNameWithType));
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.NO_OP, "Table is already balanced",
-            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment, preChecksResult);
+            summary ? null : instancePartitionsMap, summary ? null : 
tierToInstancePartitionsMap,
+            summary ? null : targetAssignment, preChecksResult, summaryResult);
       } else {
         if (dryRun) {
           return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.DONE,
-              "Instance reassigned in dry-run mode, table is already 
balanced", instancePartitionsMap,
-              tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+              "Instance reassigned in dry-run mode, table is already balanced",
+              summary ? null : instancePartitionsMap, summary ? null : 
tierToInstancePartitionsMap,
+              summary ? null : targetAssignment, preChecksResult, 
summaryResult);
         } else {
           _tableRebalanceObserver.onSuccess(
               String.format("For rebalanceId: %s, instance reassigned but 
table: %s is already balanced",
                   rebalanceJobId, tableNameWithType));
           return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.DONE,
-              "Instance reassigned, table is already balanced", 
instancePartitionsMap, tierToInstancePartitionsMap,
-              targetAssignment, preChecksResult);
+              "Instance reassigned, table is already balanced", summary ? null 
: instancePartitionsMap,
+              summary ? null : tierToInstancePartitionsMap, summary ? null : 
targetAssignment, preChecksResult,
+              summaryResult);
         }
       }
     }
 
+    if (summary) {
+      LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run 
summary mode, returning the summary only",
+          rebalanceJobId, tableNameWithType);
+      return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, 
"Dry-run summary mode", null,
+          null, null, preChecksResult, summaryResult);
+    }
+
     if (dryRun) {
       LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run mode, 
returning the target assignment",
           rebalanceJobId, tableNameWithType);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE, 
"Dry-run mode", instancePartitionsMap,
-          tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+          tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
     }
 
     if (downtime) {
@@ -352,14 +383,14 @@ public class TableRebalancer {
         return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
             "Success with downtime (replaced IdealState with the target 
segment assignment, ExternalView might not "
                 + "reach the target segment assignment yet)", 
instancePartitionsMap, tierToInstancePartitionsMap,
-            targetAssignment, preChecksResult);
+            targetAssignment, preChecksResult, summaryResult);
       } catch (Exception e) {
         onReturnFailure(String.format(
             "For rebalanceId: %s, caught exception while updating IdealState 
for table: %s, aborting the rebalance",
             rebalanceJobId, tableNameWithType), e);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while updating IdealState: " + e, 
instancePartitionsMap, tierToInstancePartitionsMap,
-            targetAssignment, preChecksResult);
+            targetAssignment, preChecksResult, summaryResult);
       }
     }
 
@@ -391,7 +422,7 @@ public class TableRebalancer {
             minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas), 
null);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Illegal min available replicas config", instancePartitionsMap, 
tierToInstancePartitionsMap,
-            targetAssignment, preChecksResult);
+            targetAssignment, preChecksResult, summaryResult);
       }
       minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
     } else {
@@ -432,12 +463,12 @@ public class TableRebalancer {
         if (_tableRebalanceObserver.isStopped()) {
           return new RebalanceResult(rebalanceJobId, 
_tableRebalanceObserver.getStopStatus(),
               "Caught exception while waiting for ExternalView to converge: " 
+ e, instancePartitionsMap,
-              tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+              tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
         }
         _tableRebalanceObserver.onError(errorMsg);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while waiting for ExternalView to converge: " + 
e, instancePartitionsMap,
-            tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+            tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
       }
 
       // Re-calculate the target assignment if IdealState changed while 
waiting for ExternalView to converge
@@ -486,7 +517,7 @@ public class TableRebalancer {
                     + "aborting the rebalance", rebalanceJobId, 
tableNameWithType), e);
             return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
                 "Caught exception while re-calculating the target assignment: 
" + e, instancePartitionsMap,
-                tierToInstancePartitionsMap, targetAssignment, 
preChecksResult);
+                tierToInstancePartitionsMap, targetAssignment, 
preChecksResult, summaryResult);
           }
         } else {
           LOGGER.info("For rebalanceId:{}, no state change found for segments 
to be moved, "
@@ -510,7 +541,7 @@ public class TableRebalancer {
         return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
             "Success with minAvailableReplicas: " + minAvailableReplicas
                 + " (both IdealState and ExternalView should reach the target 
segment assignment)",
-            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment, preChecksResult);
+            instancePartitionsMap, tierToInstancePartitionsMap, 
targetAssignment, preChecksResult, summaryResult);
       }
 
       // Record change of current ideal state and the new target
@@ -519,7 +550,7 @@ public class TableRebalancer {
       if (_tableRebalanceObserver.isStopped()) {
         return new RebalanceResult(rebalanceJobId, 
_tableRebalanceObserver.getStopStatus(),
             "Rebalance has stopped already before updating the IdealState", 
instancePartitionsMap,
-            tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+            tierToInstancePartitionsMap, targetAssignment, preChecksResult, 
summaryResult);
       }
       Map<String, Map<String, String>> nextAssignment =
           getNextAssignment(currentAssignment, targetAssignment, 
minAvailableReplicas, enableStrictReplicaGroup,
@@ -550,7 +581,7 @@ public class TableRebalancer {
             + "aborting the rebalance", rebalanceJobId, tableNameWithType), e);
         return new RebalanceResult(rebalanceJobId, 
RebalanceResult.Status.FAILED,
             "Caught exception while updating IdealState: " + e, 
instancePartitionsMap, tierToInstancePartitionsMap,
-            targetAssignment, preChecksResult);
+            targetAssignment, preChecksResult, summaryResult);
       }
 
       segmentsToMonitor = new HashSet<>(segmentsToMove);
@@ -559,6 +590,147 @@ public class TableRebalancer {
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    LOGGER.info("Fetching the table size for rebalance summary for table: {}", 
tableNameWithType);
+    try {
+      // TODO: Consider making the timeoutMs for fetching table size via table 
rebalancer configurable
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      LOGGER.error("Caught exception while trying to fetch table size details 
for table: {}", tableNameWithType, e);
+    }
+    LOGGER.info("Fetched the table size (per replica size: {}) for rebalance 
summary for table: {}",
+        tableSizePerReplicaInBytes, tableNameWithType);
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, 
newReplicationFactor);
+
+    int existingNumServers = existingServersToSegmentMap.size();
+    int newNumServers = newServersToSegmentMap.size();
+    RebalanceSummaryResult.RebalanceChangeInfo numServers
+        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, 
newNumServers);
+
+    List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+        _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+    Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceToTagsMap.put(instanceConfig.getInstanceName(), 
instanceConfig.getTags());
+    }
+
+    Set<String> serversAdded = new HashSet<>();
+    Set<String> serversRemoved = new HashSet<>();
+    Set<String> serversUnchanged = new HashSet<>();
+    Set<String> serversGettingNewSegments = new HashSet<>();
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
+    int segmentsNotMoved = 0;
+    int maxSegmentsAddedToServer = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentSet = entry.getValue();
+      int totalNewSegments = segmentSet.size();
+
+      Set<String> newSegmentSet = new HashSet<>(segmentSet);
+      Set<String> existingSegmentSet = new HashSet<>();
+      int segmentsUnchanged = 0;
+      int totalExistingSegments = 0;
+      RebalanceSummaryResult.ServerStatus serverStatus = 
RebalanceSummaryResult.ServerStatus.ADDED;
+      if (existingServersToSegmentMap.containsKey(server)) {
+        Set<String> segmentSetForServer = 
existingServersToSegmentMap.get(server);
+        totalExistingSegments = segmentSetForServer.size();
+        existingSegmentSet.addAll(segmentSetForServer);
+        Set<String> intersection = new HashSet<>(segmentSetForServer);
+        intersection.retainAll(newSegmentSet);
+        segmentsUnchanged = intersection.size();
+        segmentsNotMoved += segmentsUnchanged;
+        serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+        serversUnchanged.add(server);
+      } else {
+        serversAdded.add(server);
+      }
+      newSegmentSet.removeAll(existingSegmentSet);
+      int segmentsAdded = newSegmentSet.size();
+      if (segmentsAdded > 0) {
+        serversGettingNewSegments.add(server);
+      }
+      maxSegmentsAddedToServer = Math.max(maxSegmentsAddedToServer, 
segmentsAdded);
+      int segmentsDeleted = existingSegmentSet.size() - segmentsUnchanged;
+
+      serverSegmentChangeInfoMap.put(server, new 
RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
+          totalNewSegments, totalExistingSegments, segmentsAdded, 
segmentsDeleted, segmentsUnchanged,
+          instanceToTagsMap.getOrDefault(server, null)));
+    }
+
+    for (Map.Entry<String, Set<String>> entry : 
existingServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      if (!serverSegmentChangeInfoMap.containsKey(server)) {
+        serversRemoved.add(server);
+        serverSegmentChangeInfoMap.put(server, new 
RebalanceSummaryResult.ServerSegmentChangeInfo(
+            RebalanceSummaryResult.ServerStatus.REMOVED, 0, 
entry.getValue().size(), 0, entry.getValue().size(), 0,
+            instanceToTagsMap.getOrDefault(server, null)));
+      }
+    }
+
+    RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(), 
targetAssignment.size());
+
+    int existingNumberSegmentsTotal = existingReplicationFactor * 
currentAssignment.size();
+    int newNumberSegmentsTotal = newReplicationFactor * 
targetAssignment.size();
+    RebalanceSummaryResult.RebalanceChangeInfo numSegmentsAcrossAllReplicas
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingNumberSegmentsTotal, 
newNumberSegmentsTotal);
+
+    int totalSegmentsToBeMoved = newNumberSegmentsTotal - segmentsNotMoved;
+
+    long tableSizePerReplicaInBytes = 
calculateTableSizePerReplicaInBytes(tableNameWithType);
+    long averageSegmentSizeInBytes = tableSizePerReplicaInBytes <= 0 ? 
tableSizePerReplicaInBytes
+        : tableSizePerReplicaInBytes / ((long) currentAssignment.size());
+    long totalEstimatedDataToBeMovedInBytes = tableSizePerReplicaInBytes <= 0 
? tableSizePerReplicaInBytes
+        : ((long) totalSegmentsToBeMoved) * averageSegmentSizeInBytes;
+
+    // Set some of the sets to null if they are empty to ensure they don't 
show up in the result
+    RebalanceSummaryResult.ServerInfo serverInfo = new 
RebalanceSummaryResult.ServerInfo(
+        serversGettingNewSegments.size(), numServers, serversAdded, 
serversRemoved, serversUnchanged,
+        serversGettingNewSegments, serverSegmentChangeInfoMap);
+    // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
+    //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
+        maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+
+    LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
+        rebalanceJobId);
+    return new RebalanceSummaryResult(serverInfo, segmentInfo);
+  }
+
   private void onReturnFailure(String errorMsg, Exception e) {
     if (e != null) {
       LOGGER.warn(errorMsg, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index e115476529..661fff70d9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -60,7 +60,7 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
                 false));
       } catch (TableNotFoundException exception) {
         rebalanceResult.put(table, new RebalanceResult(null, 
RebalanceResult.Status.FAILED, exception.getMessage(),
-            null, null, null, null));
+            null, null, null, null, null));
       }
     });
     if (config.isDryRun()) {
@@ -71,7 +71,8 @@ public class DefaultTenantRebalancer implements 
TenantRebalancer {
         if (result.getStatus() == RebalanceResult.Status.DONE) {
           rebalanceResult.put(table, new RebalanceResult(result.getJobId(), 
RebalanceResult.Status.IN_PROGRESS,
               "In progress, check controller task status for the", 
result.getInstanceAssignment(),
-              result.getTierInstanceAssignment(), 
result.getSegmentAssignment(), result.getPreChecksResult()));
+              result.getTierInstanceAssignment(), 
result.getSegmentAssignment(), result.getPreChecksResult(),
+              result.getRebalanceSummaryResult()));
         }
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
index c96c25b250..ed0caf0f29 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -36,7 +36,7 @@ public class TenantRebalanceResult {
       _rebalanceTableResults = new HashMap<>();
       rebalanceTableResults.forEach((table, result) -> {
         _rebalanceTableResults.put(table, new 
RebalanceResult(result.getJobId(), result.getStatus(),
-            result.getDescription(), null, null, null, null));
+            result.getDescription(), null, null, null, null, null));
       });
     }
   }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index de83ec6eee..2f7ab350bc 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.assignment.InstancePartitions;
@@ -91,15 +92,26 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
i, true);
     }
 
+    ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
-    preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
-    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker);
+    preChecker.init(_helixResourceManager, executorService);
+    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
+        _helixResourceManager.getTableSizeReader());
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
     // Rebalance should fail without creating the table
     RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+    assertNull(rebalanceResult.getRebalanceSummaryResult());
+
+    // Rebalance with dry-run summary should fail without creating the table
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+    assertNull(rebalanceResult.getRebalanceSummaryResult());
 
     // Create the table
     addDummySchema(RAW_TABLE_NAME);
@@ -114,6 +126,27 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     Map<String, Map<String, String>> oldSegmentAssignment =
         
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
 
+    // Rebalance with dry-run summary should return NO_OP status
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+
+    // Dry-run mode should not change the IdealState
+    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+        oldSegmentAssignment);
+
     // Rebalance should return NO_OP status
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
@@ -137,8 +170,54 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX + 
(numServers + i), true);
     }
 
+    // Rebalance in dry-run summary mode with added servers
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 14);
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
+
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap =
+        rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
+    assertNotNull(serverSegmentChangeInfoMap);
+    for (int i = 0; i < numServers; i++) {
+      // Original servers should be losing some segments
+      String newServer = SERVER_INSTANCE_ID_PREFIX + i;
+      RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
+      assertTrue(serverSegmentChange.getSegmentsDeleted() > 0);
+      assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
+      assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
+      assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0);
+      assertEquals(serverSegmentChange.getSegmentsAdded(), 0);
+    }
+    for (int i = 0; i < numServersToAdd; i++) {
+      // New servers should only get new segments
+      String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+      RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
+      assertTrue(serverSegmentChange.getSegmentsAdded() > 0);
+      assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0);
+      assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 
serverSegmentChange.getSegmentsAdded());
+      assertEquals(serverSegmentChange.getSegmentsDeleted(), 0);
+      assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0);
+    }
+
+    // Dry-run mode should not change the IdealState
+    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+        oldSegmentAssignment);
+
     // Rebalance in dry-run mode
-    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setDryRun(true);
     rebalanceConfig.setPreChecks(true);
     rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
@@ -186,6 +265,26 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
         oldSegmentAssignment);
 
+    // Rebalance dry-run summary with 3 min available replicas should not be 
impacted since actual rebalance does not
+    // occur
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setPreChecks(true);
+    rebalanceConfig.setMinAvailableReplicas(3);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    assertNotNull(rebalanceResult.getPreChecksResult());
+
     // Rebalance with 3 min available replicas should fail as the table only 
have 3 replicas
     rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setMinAvailableReplicas(3);
@@ -218,6 +317,41 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
+    // Try dry-run summary mode
+    // No need to reassign instances because instances should be automatically 
assigned when updating the table config
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 11);
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+
+    serverSegmentChangeInfoMap = 
rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
+    assertNotNull(serverSegmentChangeInfoMap);
+    for (int i = 0; i < numServers + numServersToAdd; i++) {
+      String newServer = SERVER_INSTANCE_ID_PREFIX + i;
+      RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange = 
serverSegmentChangeInfoMap.get(newServer);
+      assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
+      // Ensure not all segments moved
+      assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
+      // Ensure all segments has something assigned prior to rebalance
+      assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
+    }
+
+    // Dry-run mode should not change the IdealState
+    
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+        newSegmentAssignment);
+
+    // Try actual rebalance
     // No need to reassign instances because instances should be automatically 
assigned when updating the table config
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -264,6 +398,25 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     tableConfig.setInstanceAssignmentConfigMap(null);
     _helixResourceManager.updateTableConfig(tableConfig);
 
+    // Try dry-run summary mode without reassignment to ensure that existing 
instance partitions are used
+    // no movement should occur
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+
     // Without instances reassignment, the rebalance should return status 
NO_OP, and the existing instance partitions
     // should be used
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
@@ -271,6 +424,26 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment);
     assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
 
+    // Try dry-run summary mode with reassignment
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    // No move expected since already balanced
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+
     // With instances reassignment, the rebalance should return status DONE, 
the existing instance partitions should be
     // removed, and the default instance partitions should be used
     rebalanceConfig = new RebalanceConfig();
@@ -300,6 +473,25 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
           TagNameUtils.getOfflineTagForTenant(null));
     }
 
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceConfig.setDowntime(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 15);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 3);
+
     // Rebalance with downtime should succeed
     rebalanceConfig = new RebalanceConfig();
     rebalanceConfig.setDowntime(true);
@@ -327,7 +519,18 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
       }
     }
 
+    // Try summary mode without dry-run set
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+    assertNull(rebalanceResult.getRebalanceSummaryResult());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+
     _helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+    executorService.shutdown();
   }
 
   /**
@@ -367,7 +570,27 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
 
     TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
-    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig(), null);
+
+    // Try dry-run summary mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+
+    // Run actual table rebalance
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     // Segment assignment should not change
     assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -382,6 +605,24 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     }
     _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, 
TIER_B_NAME, 3, 3, 0));
 
+    // Try dry-run summary mode, should be no-op
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+
     // rebalance is NOOP and no change in assignment caused by new instances
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
@@ -400,6 +641,24 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
             TierFactory.PINOT_SERVER_STORAGE_TYPE, NO_TIER_NAME + "_OFFLINE", 
null, null)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
+    // Try dry-run summary mode, some moves should occur
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 15);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 9);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 6);
+
     // rebalance should change assignment
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -458,7 +717,26 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
 
     TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
-    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
new RebalanceConfig(), null);
+
+    // Try dry-run summary mode
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    RebalanceSummaryResult rebalanceSummaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
     // Segment assignment should not change
     assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -469,6 +747,25 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
           "replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX 
+ i, false);
     }
     _helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER, 
"replicaAssignment" + TIER_A_NAME, 6, 6, 0));
+
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceResult.getRebalanceSummaryResult());
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 0);
+
     // rebalance is NOOP and no change in assignment caused by new instances
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
@@ -481,6 +778,24 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
             TierFactory.PINOT_SERVER_STORAGE_TYPE, "replicaAssignment" + 
TIER_A_NAME + "_OFFLINE", null, null)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceResult.getRebalanceSummaryResult());
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 30);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
 6);
+
     // rebalance should change assignment
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -504,6 +819,23 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
         new InstanceAssignmentConfig(tagPoolConfig, null, 
replicaGroupPartitionConfig, null, false)));
     _helixResourceManager.updateTableConfig(tableConfig);
 
+    // Try dry-run summary mode
+    rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+    rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(rebalanceSummaryResult);
+    assertNotNull(rebalanceSummaryResult.getServerInfo());
+    assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+    assertNull(rebalanceResult.getInstanceAssignment());
+    assertNull(rebalanceResult.getTierInstanceAssignment());
+    assertNull(rebalanceResult.getSegmentAssignment());
+    
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 13);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 6);
+    
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 6);
+
     rebalanceResult = tableRebalancer.rebalance(tableConfig, new 
RebalanceConfig(), null);
     assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
     
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6750a7340d..7b8e4168a7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
 import java.sql.ResultSet;
 import java.sql.Statement;
 import java.sql.Timestamp;
@@ -40,6 +42,7 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -64,10 +67,12 @@ import 
org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.ServiceStatus;
 import org.apache.pinot.common.utils.SimpleHttpResponse;
 import org.apache.pinot.common.utils.http.HttpClient;
+import 
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -176,6 +181,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
   private PinotHelixResourceManager _resourceManager;
   private TableRebalancer _tableRebalancer;
+  private ExecutorService _executorService;
 
   protected int getNumBrokers() {
     return NUM_BROKERS;
@@ -287,8 +293,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     _resourceManager = _controllerStarter.getHelixResourceManager();
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
-    preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
-    _tableRebalancer = new 
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker);
+    _executorService = Executors.newFixedThreadPool(10);
+    preChecker.init(_helixResourceManager, _executorService);
+    _tableRebalancer = new 
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker,
+        _resourceManager.getTableSizeReader());
   }
 
   private void reloadAllSegments(String testQuery, boolean forceDownload, long 
numTotalDocs)
@@ -3062,6 +3070,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     stopController();
     stopZk();
     FileUtils.deleteDirectory(_tempDir);
+    _executorService.shutdown();
   }
 
   private void testInstanceDecommission()
@@ -4018,4 +4027,199 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     assertEquals(result.get("clientRequestId").asText(), clientRequestId);
   }
+
+  @Test
+  public void testRebalanceDryRunSummary()
+      throws Exception {
+    // setup the rebalance config
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+
+    TableConfig tableConfig = getOfflineTableConfig();
+
+    // Ensure summary status is null if not enabled
+    RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    assertNull(rebalanceResult.getRebalanceSummaryResult());
+
+    // Enable summary, nothing is set
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP, 
false, getNumServers(), getNumServers(),
+        tableConfig.getReplication());
+
+    // Add a new server (to force change in instance assignment) and enable 
reassignInstances
+    BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.DONE, 
true, getNumServers(),
+        getNumServers() + 1, tableConfig.getReplication());
+
+    // Disable dry-run, summary still enabled
+    rebalanceConfig.setDryRun(false);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertNull(rebalanceResult.getRebalanceSummaryResult());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+
+    // Disable summary along with dry-run to do a real rebalance
+    rebalanceConfig.setSummary(false);
+    rebalanceConfig.setDowntime(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertNull(rebalanceResult.getRebalanceSummaryResult());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+    waitForRebalanceToComplete(rebalanceResult, 600_000L);
+
+    // Untag the added server
+    _resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "", 
false);
+
+    // Re-enable dry-run
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.DONE, 
true, getNumServers() + 1,
+        getNumServers(), tableConfig.getReplication());
+
+    // Disable dry-run and summary to do a real rebalance
+    rebalanceConfig.setDryRun(false);
+    rebalanceConfig.setSummary(false);
+    rebalanceConfig.setDowntime(true);
+    rebalanceConfig.setReassignInstances(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    assertNull(rebalanceResult.getRebalanceSummaryResult());
+    assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+    waitForRebalanceToComplete(rebalanceResult, 600_000L);
+
+    // Stop the server
+    serverStarter1.stop();
+    TestUtils.waitForCondition(aVoid -> 
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful(),
+        60_000L, "Failed to drop added server");
+
+    // Try dry-run with summary again
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setSummary(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP, 
false, getNumServers(), getNumServers(),
+        tableConfig.getReplication());
+
+    // Try dry-run with summary and pre-checks
+    rebalanceConfig.setPreChecks(true);
+    rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP, 
false, getNumServers(), getNumServers(),
+        tableConfig.getReplication());
+    assertNotNull(rebalanceResult.getPreChecksResult());
+    
assertTrue(rebalanceResult.getPreChecksResult().containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+    
assertTrue(rebalanceResult.getPreChecksResult().containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
+    
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS),
 "false");
+    
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT),
+        "false");
+  }
+
+  private void checkRebalanceDryRunSummary(RebalanceResult rebalanceResult, 
RebalanceResult.Status expectedStatus,
+      boolean isSegmentsToBeMoved, int existingNumServers, int newNumServers, 
int replicationFactor) {
+    assertEquals(rebalanceResult.getStatus(), expectedStatus);
+    RebalanceSummaryResult summaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(summaryResult);
+    assertNotNull(summaryResult.getServerInfo());
+    assertNotNull(summaryResult.getSegmentInfo());
+    
assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(),
 replicationFactor,
+        "Existing replication factor doesn't match expected");
+    
assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(),
+        
summaryResult.getSegmentInfo().getReplicationFactor().getExpectedValueAfterRebalance(),
+        "Existing and new replication factor doesn't match");
+    
assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 existingNumServers,
+        "Existing number of servers don't match");
+    
assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 newNumServers,
+        "New number of servers don't match");
+    if (_tableSize > 0) {
+      
assertTrue(summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes()
 > 0L,
+          "Avg segment size expected to be > 0 but found to be 0");
+    }
+    
assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(),
+        summaryResult.getServerInfo().getServersGettingNewSegments().size());
+    if (existingNumServers != newNumServers) {
+      
assertTrue(summaryResult.getServerInfo().getNumServersGettingNewSegments() > 0,
+          "Expected number of servers should be > 0");
+    } else {
+      
assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(), 0,
+          "Expected number of servers getting new segments should be 0");
+    }
+
+    if (isSegmentsToBeMoved) {
+      assertTrue(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved() > 
0,
+          "Segments to be moved should be > 0");
+      
assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(),
+          summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved()
+              * 
summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes(),
+          "Estimated data to be moved in bytes doesn't match");
+      
assertTrue(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer() 
> 0,
+          "Estimated max number of segments to move in a single server should 
be > 0");
+    } else {
+      assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(), 
0, "Segments to be moved should be 0");
+      
assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(),
 0L,
+          "Estimated data to be moved in bytes should be 0");
+      
assertEquals(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer(),
 0,
+          "Estimated max number of segments to move in a single server should 
be 0");
+    }
+
+    // Validate server status stats with numServers information
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap =
+        summaryResult.getServerInfo().getServerSegmentChangeInfo();
+    int numServersAdded = 0;
+    int numServersRemoved = 0;
+    int numServersUnchanged = 0;
+    for (RebalanceSummaryResult.ServerSegmentChangeInfo 
serverSegmentChangeInfo : serverSegmentChangeInfoMap.values()) {
+      switch (serverSegmentChangeInfo.getServerStatus()) {
+        case UNCHANGED:
+          numServersUnchanged++;
+          break;
+        case ADDED:
+          numServersAdded++;
+          break;
+        case REMOVED:
+          numServersRemoved++;
+          break;
+        default:
+          Assert.fail(String.format("Unknown server status encountered: %s",
+              serverSegmentChangeInfo.getServerStatus()));
+          break;
+      }
+    }
+
+    
Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
+        numServersRemoved + numServersUnchanged);
+    
Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+        numServersAdded + numServersUnchanged);
+
+    assertEquals(numServersAdded, 
summaryResult.getServerInfo().getServersAdded().size());
+    assertEquals(numServersRemoved, 
summaryResult.getServerInfo().getServersRemoved().size());
+    assertEquals(numServersUnchanged, 
summaryResult.getServerInfo().getServersUnchanged().size());
+  }
+
+  protected void waitForRebalanceToComplete(RebalanceResult rebalanceResult, 
long timeoutMs) {
+    String jobId = rebalanceResult.getJobId();
+    if (rebalanceResult.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+      return;
+    }
+
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        String requestUrl = 
getControllerRequestURLBuilder().forTableRebalanceStatus(jobId);
+        try {
+          SimpleHttpResponse httpResponse =
+              
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new 
URL(requestUrl).toURI(), null));
+
+          ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse =
+              JsonUtils.stringToObject(httpResponse.getResponse(), 
ServerRebalanceJobStatusResponse.class);
+          RebalanceResult.Status status = 
serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus();
+          return status != RebalanceResult.Status.IN_PROGRESS;
+        } catch (HttpErrorStatusException | URISyntaxException e) {
+          throw new IOException(e);
+        }
+      } catch (Exception e) {
+        return null;
+      }
+    }, 1000L, timeoutMs, "Failed to load all segments after rebalance");
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to