klsince commented on code in PR #15050: URL: https://github.com/apache/pinot/pull/15050#discussion_r1962208544
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -559,6 +588,146 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb } } + private long calculateTableSizePerReplicaInBytes(String tableNameWithType) { + long tableSizePerReplicaInBytes = -1; + if (_tableSizeReader == null) { + LOGGER.warn("tableSizeReader is null, cannot calculate table size for table {}!", tableNameWithType); + return tableSizePerReplicaInBytes; + } + LOGGER.info("Fetching the table size for rebalance summary for table: {}", tableNameWithType); + try { + // TODO: Consider making the timeoutMs for fetching table size via table rebalancer configurable + TableSizeReader.TableSubTypeSizeDetails tableSizeDetails = + _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000); + tableSizePerReplicaInBytes = tableSizeDetails._reportedSizePerReplicaInBytes; + } catch (InvalidConfigException e) { + LOGGER.error("Caught exception while trying to fetch table size details for table: {}", tableNameWithType, e); + } + LOGGER.info("Fetched the table size for rebalance summary for table: {}", tableNameWithType); Review Comment: add `tableSizePerReplicaInBytes` in the LOG.info msg too ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java: ########## @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.annotation.Nullable; + + +/** + * Holds the summary data of the rebalance result + */ +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonInclude(JsonInclude.Include.NON_NULL) +public class RebalanceSummaryResult { + + public static class ServerSegmentChangeInfo { + private final ServerStatus _serverStatus; + private final int _totalSegmentsAfterRebalance; + private final int _totalSegmentsBeforeRebalance; + private final int _segmentsAdded; + private final int _segmentsDeleted; + private final int _segmentsUnchanged; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final List<String> _tagList; + + /** + * Constructor for ServerSegmentChangeInfo + * @param serverStatus server status, whether it was added, removed, or unchanged as part of this rebalance + * @param totalSegmentsAfterRebalance expected total segments on this server after rebalance + * @param totalSegmentsBeforeRebalance current number of segments on this server before rebalance + * @param segmentsAdded number of segments expected to be added as part of this rebalance + * @param segmentsDeleted number of segments expected to be deleted as part of this rebalance + * @param segmentsUnchanged number of segments that aren't moving from this server as part of this rebalance + * @param tagList server tag list + */ + @JsonCreator + public ServerSegmentChangeInfo(@JsonProperty("serverStatus") ServerStatus serverStatus, + @JsonProperty("totalSegmentsAfterRebalance") int totalSegmentsAfterRebalance, + @JsonProperty("totalSegmentsBeforeRebalance") int totalSegmentsBeforeRebalance, + @JsonProperty("segmentsAdded") int segmentsAdded, @JsonProperty("segmentsDeleted") int segmentsDeleted, + @JsonProperty("segmentsUnchanged") int segmentsUnchanged, + @JsonProperty("tagList") @Nullable List<String> tagList) { + _serverStatus = serverStatus; + _totalSegmentsAfterRebalance = totalSegmentsAfterRebalance; + _totalSegmentsBeforeRebalance = totalSegmentsBeforeRebalance; + _segmentsAdded = segmentsAdded; + _segmentsDeleted = segmentsDeleted; + _segmentsUnchanged = segmentsUnchanged; + _tagList = tagList; + } + + @JsonProperty + public ServerStatus getServerStatus() { + return _serverStatus; + } + + @JsonProperty + public int getTotalSegmentsAfterRebalance() { + return _totalSegmentsAfterRebalance; + } + + @JsonProperty + public int getTotalSegmentsBeforeRebalance() { + return _totalSegmentsBeforeRebalance; + } + + @JsonProperty + public int getSegmentsAdded() { + return _segmentsAdded; + } + + @JsonProperty + public int getSegmentsDeleted() { + return _segmentsDeleted; + } + + @JsonProperty + public int getSegmentsUnchanged() { + return _segmentsUnchanged; + } + + @JsonProperty + public List<String> getTagList() { + return _tagList; + } + } + + public static class RebalanceChangeInfo { + private final int _valueBeforeRebalance; + private final int _expectedValueAfterRebalance; + + /** + * Constructor for RebalanceChangeInfo + * @param valueBeforeRebalance current value before rebalance + * @param expectedValueAfterRebalance expected value after rebalance + */ + @JsonCreator + public RebalanceChangeInfo(@JsonProperty("valueBeforeRebalance") int valueBeforeRebalance, + @JsonProperty("expectedValueAfterRebalance") int expectedValueAfterRebalance) { + _valueBeforeRebalance = valueBeforeRebalance; + _expectedValueAfterRebalance = expectedValueAfterRebalance; + } + + @JsonProperty + public int getValueBeforeRebalance() { + return _valueBeforeRebalance; + } + + @JsonProperty + public int getExpectedValueAfterRebalance() { + return _expectedValueAfterRebalance; + } + } + + public static class ServerInfo { + private final int _numServersGettingNewSegments; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _numServers; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversAdded; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversRemoved; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversUnchanged; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Set<String> _serversGettingNewSegments; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final Map<String, ServerSegmentChangeInfo> _serverSegmentChangeInfo; + + /** + * Constructor for ServerInfo + * @param numServersGettingNewSegments total number of servers receiving new segments as part of this rebalance + * @param numServers number of servers before and after this rebalance + * @param serversAdded set of servers getting added as part of this rebalance + * @param serversRemoved set of servers getting removed as part of this rebalance + * @param serversUnchanged set of servers existing both before and as part of this rebalance + * @param serversGettingNewSegments set of servers getting segments added + * @param serverSegmentChangeInfo per server statistics for this rebalance + */ + @JsonCreator + public ServerInfo(@JsonProperty("numServersGettingNewSegments") int numServersGettingNewSegments, + @JsonProperty("numServers") @Nullable RebalanceChangeInfo numServers, + @JsonProperty("serversAdded") @Nullable Set<String> serversAdded, + @JsonProperty("serversRemoved") @Nullable Set<String> serversRemoved, + @JsonProperty("serversUnchanged") @Nullable Set<String> serversUnchanged, + @JsonProperty("serversGettingNewSegments") @Nullable Set<String> serversGettingNewSegments, + @JsonProperty("serverSegmentChangeInfo") + @Nullable Map<String, ServerSegmentChangeInfo> serverSegmentChangeInfo) { + _numServersGettingNewSegments = numServersGettingNewSegments; + _numServers = numServers; + _serversAdded = serversAdded; + _serversRemoved = serversRemoved; + _serversUnchanged = serversUnchanged; + _serversGettingNewSegments = serversGettingNewSegments; + _serverSegmentChangeInfo = serverSegmentChangeInfo; + } + + @JsonProperty + public int getNumServersGettingNewSegments() { + return _numServersGettingNewSegments; + } + + @JsonProperty + public RebalanceChangeInfo getNumServers() { + return _numServers; + } + + @JsonProperty + public Set<String> getServersAdded() { + return _serversAdded; + } + + @JsonProperty + public Set<String> getServersRemoved() { + return _serversRemoved; + } + + @JsonProperty + public Set<String> getServersUnchanged() { + return _serversUnchanged; + } + + @JsonProperty + public Set<String> getServersGettingNewSegments() { + return _serversGettingNewSegments; + } + + @JsonProperty + public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() { + return _serverSegmentChangeInfo; + } + } + + public static class SegmentInfo { + // TODO: Add a metric to estimate the total time it will take to rebalance + private final int _totalSegmentsToBeMoved; + private final int _maxSegmentsAddedToASingleServer; + private final long _estimatedAverageSegmentSizeInBytes; + private final long _totalEstimatedDataToBeMovedInBytes; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _replicationFactor; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _numSegmentsInSingleReplica; + @JsonInclude(JsonInclude.Include.NON_NULL) + private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas; + + /** + * Constructor for SegmentInfo + * @param totalSegmentsToBeMoved total number of segments to be moved as part of this rebalance + * @param maxSegmentsAddedToASingleServer maximum segments added to a single server as part of this rebalance + * @param estimatedAverageSegmentSizeInBytes estimated average size of segments in bytes + * @param totalEstimatedDataToBeMovedInBytes total estimated amount of data to be moved as part of this rebalance + * @param replicationFactor replication factor before and after this rebalance + * @param numSegmentsInSingleReplica number of segments in single replica before and after this rebalance + * @param numSegmentsAcrossAllReplicas total number of segments across all replicas before and after this rebalance + */ + @JsonCreator + public SegmentInfo(@JsonProperty("totalSegmentsToBeMoved") int totalSegmentsToBeMoved, + @JsonProperty("maxSegmentsAddedToASingleServer") int maxSegmentsAddedToASingleServer, + @JsonProperty("estimatedAverageSegmentSizeInBytes") long estimatedAverageSegmentSizeInBytes, + @JsonProperty("totalEstimatedDataToBeMovedInBytes") long totalEstimatedDataToBeMovedInBytes, + @JsonProperty("replicationFactor") @Nullable RebalanceChangeInfo replicationFactor, + @JsonProperty("numSegmentsInSingleReplica") @Nullable RebalanceChangeInfo numSegmentsInSingleReplica, + @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable RebalanceChangeInfo numSegmentsAcrossAllReplicas) { + _totalSegmentsToBeMoved = totalSegmentsToBeMoved; + _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer; + _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes; + _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes; + _replicationFactor = replicationFactor; + _numSegmentsInSingleReplica = numSegmentsInSingleReplica; + _numSegmentsAcrossAllReplicas = numSegmentsAcrossAllReplicas; + } + + @JsonProperty + public int getTotalSegmentsToBeMoved() { + return _totalSegmentsToBeMoved; + } + + @JsonProperty + public int getMaxSegmentsAddedToASingleServer() { + return _maxSegmentsAddedToASingleServer; + } + + @JsonProperty + public long getEstimatedAverageSegmentSizeInBytes() { + return _estimatedAverageSegmentSizeInBytes; + } + + @JsonProperty + public long getTotalEstimatedDataToBeMovedInBytes() { + return _totalEstimatedDataToBeMovedInBytes; + } + + @JsonProperty + public RebalanceChangeInfo getReplicationFactor() { + return _replicationFactor; + } + + @JsonProperty + public RebalanceChangeInfo getNumSegmentsInSingleReplica() { + return _numSegmentsInSingleReplica; + } + + @JsonProperty + public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() { + return _numSegmentsAcrossAllReplicas; + } + } + Review Comment: nit: I feel it's a bit more readable to move those sub classes below the top class fields and methods, unless it's auto-formatted... ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -291,43 +305,58 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb + "rebalance", rebalanceJobId, tableNameWithType), e); return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED, "Caught exception while calculating target assignment: " + e, instancePartitionsMap, - tierToInstancePartitionsMap, null, preChecksResult); + tierToInstancePartitionsMap, null, preChecksResult, null); } boolean segmentAssignmentUnchanged = currentAssignment.equals(targetAssignment); LOGGER.info("For rebalanceId: {}, instancePartitionsUnchanged: {}, tierInstancePartitionsUnchanged: {}, " + "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId, instancePartitionsUnchanged, tierInstancePartitionsUnchanged, segmentAssignmentUnchanged, tableNameWithType); + RebalanceSummaryResult summaryResult = null; + if (summary) { + summaryResult = calculateDryRunSummary(currentAssignment, targetAssignment, tableNameWithType, rebalanceJobId); + } Review Comment: make sense. Perhaps comment this for this if-branch so that others know the purpose and keep it here. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -559,6 +588,146 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb } } + private long calculateTableSizePerReplicaInBytes(String tableNameWithType) { + long tableSizePerReplicaInBytes = -1; + if (_tableSizeReader == null) { + LOGGER.warn("tableSizeReader is null, cannot calculate table size for table {}!", tableNameWithType); + return tableSizePerReplicaInBytes; + } + LOGGER.info("Fetching the table size for rebalance summary for table: {}", tableNameWithType); + try { + // TODO: Consider making the timeoutMs for fetching table size via table rebalancer configurable + TableSizeReader.TableSubTypeSizeDetails tableSizeDetails = + _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000); + tableSizePerReplicaInBytes = tableSizeDetails._reportedSizePerReplicaInBytes; + } catch (InvalidConfigException e) { + LOGGER.error("Caught exception while trying to fetch table size details for table: {}", tableNameWithType, e); + } + LOGGER.info("Fetched the table size for rebalance summary for table: {}", tableNameWithType); + return tableSizePerReplicaInBytes; + } + + private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, String>> currentAssignment, + Map<String, Map<String, String>> targetAssignment, String tableNameWithType, String rebalanceJobId) { + LOGGER.info("Calculating rebalance summary for table: {} with rebalanceJobId: {}", + tableNameWithType, rebalanceJobId); + int existingReplicationFactor = 0; + int newReplicationFactor = 0; + Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>(); + Map<String, Set<String>> newServersToSegmentMap = new HashMap<>(); + + for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { + existingReplicationFactor = entrySet.getValue().size(); + for (String segmentKey : entrySet.getValue().keySet()) { + existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } + } + + for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { + newReplicationFactor = entrySet.getValue().size(); + for (String segmentKey : entrySet.getValue().keySet()) { + newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + } + } + RebalanceSummaryResult.RebalanceChangeInfo replicationFactor + = new RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor, newReplicationFactor); + + int existingNumServers = existingServersToSegmentMap.size(); + int newNumServers = newServersToSegmentMap.size(); + RebalanceSummaryResult.RebalanceChangeInfo numServers + = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers, newNumServers); + + List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues( + _helixDataAccessor.keyBuilder().instanceConfigs(), true); + Map<String, List<String>> instanceToTagsMap = new HashMap<>(); + for (InstanceConfig instanceConfig : instanceConfigs) { + instanceToTagsMap.put(instanceConfig.getInstanceName(), instanceConfig.getTags()); + } + + Set<String> serversAdded = new HashSet<>(); + Set<String> serversRemoved = new HashSet<>(); + Set<String> serversUnchanged = new HashSet<>(); + Set<String> serversGettingNewSegments = new HashSet<>(); + Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo> serverSegmentChangeInfoMap = new HashMap<>(); + int segmentsNotMoved = 0; + int maxSegmentsAddedToServer = 0; + for (Map.Entry<String, Set<String>> entry : newServersToSegmentMap.entrySet()) { + String server = entry.getKey(); + Set<String> segmentSet = entry.getValue(); + int totalNewSegments = segmentSet.size(); + + Set<String> newSegmentList = new HashSet<>(segmentSet); Review Comment: nit: newSegmentList -> newSegmentSet or newSegments? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org