klsince commented on code in PR #15050:
URL: https://github.com/apache/pinot/pull/15050#discussion_r1962208544


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,146 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    LOGGER.info("Fetching the table size for rebalance summary for table: {}", 
tableNameWithType);
+    try {
+      // TODO: Consider making the timeoutMs for fetching table size via table 
rebalancer configurable
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      LOGGER.error("Caught exception while trying to fetch table size details 
for table: {}", tableNameWithType, e);
+    }
+    LOGGER.info("Fetched the table size for rebalance summary for table: {}", 
tableNameWithType);

Review Comment:
   add `tableSizePerReplicaInBytes` in the LOG.info msg too



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java:
##########
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * Holds the summary data of the rebalance result
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class RebalanceSummaryResult {
+
+  public static class ServerSegmentChangeInfo {
+    private final ServerStatus _serverStatus;
+    private final int _totalSegmentsAfterRebalance;
+    private final int _totalSegmentsBeforeRebalance;
+    private final int _segmentsAdded;
+    private final int _segmentsDeleted;
+    private final int _segmentsUnchanged;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final List<String> _tagList;
+
+    /**
+     * Constructor for ServerSegmentChangeInfo
+     * @param serverStatus server status, whether it was added, removed, or 
unchanged as part of this rebalance
+     * @param totalSegmentsAfterRebalance expected total segments on this 
server after rebalance
+     * @param totalSegmentsBeforeRebalance current number of segments on this 
server before rebalance
+     * @param segmentsAdded number of segments expected to be added as part of 
this rebalance
+     * @param segmentsDeleted number of segments expected to be deleted as 
part of this rebalance
+     * @param segmentsUnchanged number of segments that aren't moving from 
this server as part of this rebalance
+     * @param tagList server tag list
+     */
+    @JsonCreator
+    public ServerSegmentChangeInfo(@JsonProperty("serverStatus") ServerStatus 
serverStatus,
+        @JsonProperty("totalSegmentsAfterRebalance") int 
totalSegmentsAfterRebalance,
+        @JsonProperty("totalSegmentsBeforeRebalance") int 
totalSegmentsBeforeRebalance,
+        @JsonProperty("segmentsAdded") int segmentsAdded, 
@JsonProperty("segmentsDeleted") int segmentsDeleted,
+        @JsonProperty("segmentsUnchanged") int segmentsUnchanged,
+        @JsonProperty("tagList") @Nullable List<String> tagList) {
+      _serverStatus = serverStatus;
+      _totalSegmentsAfterRebalance = totalSegmentsAfterRebalance;
+      _totalSegmentsBeforeRebalance = totalSegmentsBeforeRebalance;
+      _segmentsAdded = segmentsAdded;
+      _segmentsDeleted = segmentsDeleted;
+      _segmentsUnchanged = segmentsUnchanged;
+      _tagList = tagList;
+    }
+
+    @JsonProperty
+    public ServerStatus getServerStatus() {
+      return _serverStatus;
+    }
+
+    @JsonProperty
+    public int getTotalSegmentsAfterRebalance() {
+      return _totalSegmentsAfterRebalance;
+    }
+
+    @JsonProperty
+    public int getTotalSegmentsBeforeRebalance() {
+      return _totalSegmentsBeforeRebalance;
+    }
+
+    @JsonProperty
+    public int getSegmentsAdded() {
+      return _segmentsAdded;
+    }
+
+    @JsonProperty
+    public int getSegmentsDeleted() {
+      return _segmentsDeleted;
+    }
+
+    @JsonProperty
+    public int getSegmentsUnchanged() {
+      return _segmentsUnchanged;
+    }
+
+    @JsonProperty
+    public List<String> getTagList() {
+      return _tagList;
+    }
+  }
+
+  public static class RebalanceChangeInfo {
+    private final int _valueBeforeRebalance;
+    private final int _expectedValueAfterRebalance;
+
+    /**
+     * Constructor for RebalanceChangeInfo
+     * @param valueBeforeRebalance current value before rebalance
+     * @param expectedValueAfterRebalance expected value after rebalance
+     */
+    @JsonCreator
+    public RebalanceChangeInfo(@JsonProperty("valueBeforeRebalance") int 
valueBeforeRebalance,
+        @JsonProperty("expectedValueAfterRebalance") int 
expectedValueAfterRebalance) {
+      _valueBeforeRebalance = valueBeforeRebalance;
+      _expectedValueAfterRebalance = expectedValueAfterRebalance;
+    }
+
+    @JsonProperty
+    public int getValueBeforeRebalance() {
+      return _valueBeforeRebalance;
+    }
+
+    @JsonProperty
+    public int getExpectedValueAfterRebalance() {
+      return _expectedValueAfterRebalance;
+    }
+  }
+
+  public static class ServerInfo {
+    private final int _numServersGettingNewSegments;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _numServers;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversAdded;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversRemoved;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversUnchanged;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Set<String> _serversGettingNewSegments;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final Map<String, ServerSegmentChangeInfo> 
_serverSegmentChangeInfo;
+
+    /**
+     * Constructor for ServerInfo
+     * @param numServersGettingNewSegments total number of servers receiving 
new segments as part of this rebalance
+     * @param numServers number of servers before and after this rebalance
+     * @param serversAdded set of servers getting added as part of this 
rebalance
+     * @param serversRemoved set of servers getting removed as part of this 
rebalance
+     * @param serversUnchanged set of servers existing both before and as part 
of this rebalance
+     * @param serversGettingNewSegments set of servers getting segments added
+     * @param serverSegmentChangeInfo per server statistics for this rebalance
+     */
+    @JsonCreator
+    public ServerInfo(@JsonProperty("numServersGettingNewSegments") int 
numServersGettingNewSegments,
+        @JsonProperty("numServers") @Nullable RebalanceChangeInfo numServers,
+        @JsonProperty("serversAdded") @Nullable Set<String> serversAdded,
+        @JsonProperty("serversRemoved") @Nullable Set<String> serversRemoved,
+        @JsonProperty("serversUnchanged") @Nullable Set<String> 
serversUnchanged,
+        @JsonProperty("serversGettingNewSegments") @Nullable Set<String> 
serversGettingNewSegments,
+        @JsonProperty("serverSegmentChangeInfo")
+        @Nullable Map<String, ServerSegmentChangeInfo> 
serverSegmentChangeInfo) {
+      _numServersGettingNewSegments = numServersGettingNewSegments;
+      _numServers = numServers;
+      _serversAdded = serversAdded;
+      _serversRemoved = serversRemoved;
+      _serversUnchanged = serversUnchanged;
+      _serversGettingNewSegments = serversGettingNewSegments;
+      _serverSegmentChangeInfo = serverSegmentChangeInfo;
+    }
+
+    @JsonProperty
+    public int getNumServersGettingNewSegments() {
+      return _numServersGettingNewSegments;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getNumServers() {
+      return _numServers;
+    }
+
+    @JsonProperty
+    public Set<String> getServersAdded() {
+      return _serversAdded;
+    }
+
+    @JsonProperty
+    public Set<String> getServersRemoved() {
+      return _serversRemoved;
+    }
+
+    @JsonProperty
+    public Set<String> getServersUnchanged() {
+      return _serversUnchanged;
+    }
+
+    @JsonProperty
+    public Set<String> getServersGettingNewSegments() {
+      return _serversGettingNewSegments;
+    }
+
+    @JsonProperty
+    public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() {
+      return _serverSegmentChangeInfo;
+    }
+  }
+
+  public static class SegmentInfo {
+    // TODO: Add a metric to estimate the total time it will take to rebalance
+    private final int _totalSegmentsToBeMoved;
+    private final int _maxSegmentsAddedToASingleServer;
+    private final long _estimatedAverageSegmentSizeInBytes;
+    private final long _totalEstimatedDataToBeMovedInBytes;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _replicationFactor;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _numSegmentsInSingleReplica;
+    @JsonInclude(JsonInclude.Include.NON_NULL)
+    private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas;
+
+    /**
+     * Constructor for SegmentInfo
+     * @param totalSegmentsToBeMoved total number of segments to be moved as 
part of this rebalance
+     * @param maxSegmentsAddedToASingleServer maximum segments added to a 
single server as part of this rebalance
+     * @param estimatedAverageSegmentSizeInBytes estimated average size of 
segments in bytes
+     * @param totalEstimatedDataToBeMovedInBytes total estimated amount of 
data to be moved as part of this rebalance
+     * @param replicationFactor replication factor before and after this 
rebalance
+     * @param numSegmentsInSingleReplica number of segments in single replica 
before and after this rebalance
+     * @param numSegmentsAcrossAllReplicas total number of segments across all 
replicas before and after this rebalance
+     */
+    @JsonCreator
+    public SegmentInfo(@JsonProperty("totalSegmentsToBeMoved") int 
totalSegmentsToBeMoved,
+        @JsonProperty("maxSegmentsAddedToASingleServer") int 
maxSegmentsAddedToASingleServer,
+        @JsonProperty("estimatedAverageSegmentSizeInBytes") long 
estimatedAverageSegmentSizeInBytes,
+        @JsonProperty("totalEstimatedDataToBeMovedInBytes") long 
totalEstimatedDataToBeMovedInBytes,
+        @JsonProperty("replicationFactor") @Nullable RebalanceChangeInfo 
replicationFactor,
+        @JsonProperty("numSegmentsInSingleReplica") @Nullable 
RebalanceChangeInfo numSegmentsInSingleReplica,
+        @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable 
RebalanceChangeInfo numSegmentsAcrossAllReplicas) {
+      _totalSegmentsToBeMoved = totalSegmentsToBeMoved;
+      _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer;
+      _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes;
+      _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes;
+      _replicationFactor = replicationFactor;
+      _numSegmentsInSingleReplica = numSegmentsInSingleReplica;
+      _numSegmentsAcrossAllReplicas = numSegmentsAcrossAllReplicas;
+    }
+
+    @JsonProperty
+    public int getTotalSegmentsToBeMoved() {
+      return _totalSegmentsToBeMoved;
+    }
+
+    @JsonProperty
+    public int getMaxSegmentsAddedToASingleServer() {
+      return _maxSegmentsAddedToASingleServer;
+    }
+
+    @JsonProperty
+    public long getEstimatedAverageSegmentSizeInBytes() {
+      return _estimatedAverageSegmentSizeInBytes;
+    }
+
+    @JsonProperty
+    public long getTotalEstimatedDataToBeMovedInBytes() {
+      return _totalEstimatedDataToBeMovedInBytes;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getReplicationFactor() {
+      return _replicationFactor;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getNumSegmentsInSingleReplica() {
+      return _numSegmentsInSingleReplica;
+    }
+
+    @JsonProperty
+    public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() {
+      return _numSegmentsAcrossAllReplicas;
+    }
+  }
+

Review Comment:
   nit: I feel it's a bit more readable to move those sub classes below the top 
class fields and methods, unless it's auto-formatted...



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -291,43 +305,58 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               + "rebalance", rebalanceJobId, tableNameWithType), e);
       return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
           "Caught exception while calculating target assignment: " + e, 
instancePartitionsMap,
-          tierToInstancePartitionsMap, null, preChecksResult);
+          tierToInstancePartitionsMap, null, preChecksResult, null);
     }
 
     boolean segmentAssignmentUnchanged = 
currentAssignment.equals(targetAssignment);
     LOGGER.info("For rebalanceId: {}, instancePartitionsUnchanged: {}, 
tierInstancePartitionsUnchanged: {}, "
             + "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId, 
instancePartitionsUnchanged,
         tierInstancePartitionsUnchanged, segmentAssignmentUnchanged, 
tableNameWithType);
 
+    RebalanceSummaryResult summaryResult = null;
+    if (summary) {
+      summaryResult = calculateDryRunSummary(currentAssignment, 
targetAssignment, tableNameWithType, rebalanceJobId);
+    }

Review Comment:
   make sense. Perhaps comment this for this if-branch so that others know the 
purpose and keep it here.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -559,6 +588,146 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
     }
   }
 
+  private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+    long tableSizePerReplicaInBytes = -1;
+    if (_tableSizeReader == null) {
+      LOGGER.warn("tableSizeReader is null, cannot calculate table size for 
table {}!", tableNameWithType);
+      return tableSizePerReplicaInBytes;
+    }
+    LOGGER.info("Fetching the table size for rebalance summary for table: {}", 
tableNameWithType);
+    try {
+      // TODO: Consider making the timeoutMs for fetching table size via table 
rebalancer configurable
+      TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+      tableSizePerReplicaInBytes = 
tableSizeDetails._reportedSizePerReplicaInBytes;
+    } catch (InvalidConfigException e) {
+      LOGGER.error("Caught exception while trying to fetch table size details 
for table: {}", tableNameWithType, e);
+    }
+    LOGGER.info("Fetched the table size for rebalance summary for table: {}", 
tableNameWithType);
+    return tableSizePerReplicaInBytes;
+  }
+
+  private RebalanceSummaryResult calculateDryRunSummary(Map<String, 
Map<String, String>> currentAssignment,
+      Map<String, Map<String, String>> targetAssignment, String 
tableNameWithType, String rebalanceJobId) {
+    LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
+        tableNameWithType, rebalanceJobId);
+    int existingReplicationFactor = 0;
+    int newReplicationFactor = 0;
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      existingReplicationFactor = entrySet.getValue().size();
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      newReplicationFactor = entrySet.getValue().size();
+      for (String segmentKey : entrySet.getValue().keySet()) {
+        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      }
+    }
+    RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+        = new 
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, 
newReplicationFactor);
+
+    int existingNumServers = existingServersToSegmentMap.size();
+    int newNumServers = newServersToSegmentMap.size();
+    RebalanceSummaryResult.RebalanceChangeInfo numServers
+        = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, 
newNumServers);
+
+    List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+        _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+    Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+    for (InstanceConfig instanceConfig : instanceConfigs) {
+      instanceToTagsMap.put(instanceConfig.getInstanceName(), 
instanceConfig.getTags());
+    }
+
+    Set<String> serversAdded = new HashSet<>();
+    Set<String> serversRemoved = new HashSet<>();
+    Set<String> serversUnchanged = new HashSet<>();
+    Set<String> serversGettingNewSegments = new HashSet<>();
+    Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> 
serverSegmentChangeInfoMap = new HashMap<>();
+    int segmentsNotMoved = 0;
+    int maxSegmentsAddedToServer = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentSet = entry.getValue();
+      int totalNewSegments = segmentSet.size();
+
+      Set<String> newSegmentList = new HashSet<>(segmentSet);

Review Comment:
   nit: newSegmentList -> newSegmentSet or newSegments?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to