klsince commented on code in PR #15266:
URL: https://github.com/apache/pinot/pull/15266#discussion_r2027561893


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -118,6 +128,116 @@ public RebalanceStateStats 
getCurrentToTargetConvergence() {
     return _currentToTargetConvergence;
   }
 
+  public RebalanceProgressStats getRebalanceProgressStatsOverall() {
+    return _rebalanceProgressStatsOverall;
+  }
+
+  public RebalanceProgressStats getRebalanceProgressStatsCurrentStep() {
+    return _rebalanceProgressStatsCurrentStep;
+  }
+
+  /**
+   * Updates the overall and step progress stats based on the latest 
calculated step's progress stats. This should
+   * be called during the EV-IS convergence trigger to ensure the overall 
stats reflect the changes as they are made.
+   * @param latestStepStats latest step level stats calculated in this 
iteration
+   */
+  public void updateOverallAndStepStatsFromLatestStepStats(
+      TableRebalanceProgressStats.RebalanceProgressStats latestStepStats) {
+    TableRebalanceProgressStats.RebalanceProgressStats lastStepStats = 
getRebalanceProgressStatsCurrentStep();
+    TableRebalanceProgressStats.RebalanceProgressStats lastOverallStats = 
getRebalanceProgressStatsOverall();
+    int numAdditionalSegmentsAdded =
+        latestStepStats._totalSegmentsToBeAdded - 
lastStepStats._totalSegmentsToBeAdded;
+    int numAdditionalSegmentsDeleted =
+        latestStepStats._totalSegmentsToBeDeleted - 
lastStepStats._totalSegmentsToBeDeleted;
+    int upperBoundOnSegmentsAdded =

Review Comment:
   I feel this method is a bit hard to follow, perhaps due to the var names are 
kinda similar `last..` vs. `latest... `
   could you help add some comments to split the method into a few major 
sections, which might help read it.
   
   btw, why we need to use `upperBoundOnSegmentsAdded`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -402,9 +402,17 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
       }
     }
 
+    List<String> segmentsToMove = 
SegmentAssignmentUtils.getSegmentsToMove(currentAssignment, targetAssignment);
+    Set<String> segmentsToMonitor = new HashSet<>(segmentsToMove);
+
+    long estimatedAverageSegmentSizeInBytes = 
summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes();
+    Set<String> allOriginalSegmentsIdealState = currentAssignment.keySet();

Review Comment:
   nit: allOriginalSegmentsIdealState -> allSegmentsFromIdealState or is there 
a special reason to call it out 'original'? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Calculates the progress stats for the given step or for the overall based 
on the trigger type
+   * @return the calculated step or progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
calculateUpdatedProgressStats(
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> currentAssignment,
+      RebalanceContext rebalanceContext, Trigger trigger, 
TableRebalanceProgressStats rebalanceProgressStats) {
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new 
HashMap<>();
+    Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+    Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+    int totalNewSegmentsNotMonitored = 0;
+    int totalSegmentsTarget = 0;
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+        newSegmentsNotExistingBefore.add(segmentName);
+      }
+      for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+        String instanceName = entry.getKey();
+        String instanceState = entry.getValue();
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {

Review Comment:
   hmm. sorry, i'm a bit confused on what this if-check tries to do? I see it 
didn't use instanceName/State so why it's inside the for-loop and why 
segmentsToMonitor is checked here 🤔 ? pls shed some light, or perhaps add a bit 
more comments for this bit method to help understand it. thanks!



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Calculates the progress stats for the given step or for the overall based 
on the trigger type
+   * @return the calculated step or progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
calculateUpdatedProgressStats(
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> currentAssignment,
+      RebalanceContext rebalanceContext, Trigger trigger, 
TableRebalanceProgressStats rebalanceProgressStats) {
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new 
HashMap<>();
+    Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+    Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+    int totalNewSegmentsNotMonitored = 0;
+    int totalSegmentsTarget = 0;
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+        newSegmentsNotExistingBefore.add(segmentName);
+      }
+      for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+        String instanceName = entry.getKey();
+        String instanceState = entry.getValue();
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          if (newSegmentsNotExistingBefore.contains(segmentName)) {
+            // Don't track newly added segments unless they're on the monitor 
list
+            totalNewSegmentsNotMonitored++;
+            newSegmentsNotExistingBefore.remove(segmentName);
+          }
+          continue;
+        }
+        if 
(instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE))
 {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);
+          continue;
+        }
+        totalSegmentsTarget += 1;
+        newServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      for (String instanceName : entrySet.getValue().keySet()) {
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          continue;
+        }
+        if (targetInstanceToOfflineSegmentsMap.containsKey(instanceName)
+            && 
targetInstanceToOfflineSegmentsMap.get(instanceName).contains(segmentName)) {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          continue;
+        }
+        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+      }
+    }
+
+    int segmentsNotMoved = 0;
+    int totalSegmentsToBeDeleted = 0;
+    int segmentsUnchangedYetNotConverged = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentSet = entry.getValue();
+
+      Set<String> newSegmentSet = new HashSet<>(segmentSet);
+      Set<String> existingSegmentSet = new HashSet<>();
+      int segmentsUnchanged = 0;
+      if (existingServersToSegmentMap.containsKey(server)) {
+        Set<String> segmentSetForServer = 
existingServersToSegmentMap.get(server);
+        existingSegmentSet.addAll(segmentSetForServer);
+        Set<String> intersection = new HashSet<>(segmentSetForServer);
+        intersection.retainAll(newSegmentSet);
+        segmentsUnchanged = intersection.size();
+        segmentsNotMoved += segmentsUnchanged;
+
+        for (String segmentName : intersection) {
+          String currentInstanceState = 
currentAssignment.get(segmentName).get(server);
+          String targetInstanceState = 
targetAssignment.get(segmentName).get(server);
+          if (!currentInstanceState.equals(targetInstanceState)) {
+            segmentsUnchangedYetNotConverged++;
+          }
+        }
+      }
+      newSegmentSet.removeAll(existingSegmentSet);
+      totalSegmentsToBeDeleted += existingSegmentSet.size() - 
segmentsUnchanged;
+    }
+
+    for (Map.Entry<String, Set<String>> entry : 
existingServersToSegmentMap.entrySet()) {
+      if (!newServersToSegmentMap.containsKey(entry.getKey())) {
+        totalSegmentsToBeDeleted += entry.getValue().size();
+      }
+    }
+
+    int newSegsAddedInThisAssignment = 0;
+    int newSegsDeletedInThisAssignment = 0;
+    for (String segment : newSegmentsNotExistingBefore) {
+      Set<String> currentSegmentAssign = currentAssignment.get(segment) != null
+          ? currentAssignment.get(segment).keySet() : new HashSet<>();
+      Set<String> targetSegmentAssign = targetAssignment.get(segment) != null
+          ? targetAssignment.get(segment).keySet() : new HashSet<>();
+
+      Set<String> segmentsAdded = new HashSet<>(targetSegmentAssign);
+      segmentsAdded.removeAll(currentSegmentAssign);
+      newSegsAddedInThisAssignment += segmentsAdded.size();
+
+      Set<String> segmentsDeleted = new HashSet<>(currentSegmentAssign);
+      segmentsDeleted.removeAll(targetSegmentAssign);
+      newSegsDeletedInThisAssignment += segmentsDeleted.size();
+    }
+
+    int newNumberSegmentsTotal = totalSegmentsTarget;
+    int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved;
+
+    TableRebalanceProgressStats.RebalanceProgressStats progressStats =
+        new TableRebalanceProgressStats.RebalanceProgressStats();
+    switch (trigger) {
+      case START_TRIGGER:
+      case NEXT_ASSINGMENT_CALCULATION_TRIGGER:
+        // These are initialization steps for global / step progress stats
+        progressStats._totalSegmentsToBeAdded = totalSegmentsToBeAdded;
+        progressStats._totalSegmentsToBeDeleted = totalSegmentsToBeDeleted;
+        progressStats._totalRemainingSegmentsToBeAdded = 
totalSegmentsToBeAdded;
+        progressStats._totalRemainingSegmentsToBeDeleted = 
totalSegmentsToBeDeleted;
+        progressStats._totalCarryOverSegmentsToBeAdded = 0;
+        progressStats._totalCarryOverSegmentsToBeDeleted = 0;
+        progressStats._totalRemainingSegmentsToConverge = 
segmentsUnchangedYetNotConverged;
+        progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = 
totalNewSegmentsNotMonitored;
+        progressStats._percentageRemainingSegmentsToBeAdded = 
totalSegmentsToBeAdded == 0 ? 0.0 : 100.0;
+        progressStats._percentageRemainingSegmentsToBeDeleted = 
totalSegmentsToBeDeleted == 0 ? 0.0 : 100.0;
+        progressStats._estimatedTimeToCompleteAddsInSeconds = 
totalSegmentsToBeAdded == 0 ? 0.0 : -1.0;
+        progressStats._estimatedTimeToCompleteDeletesInSeconds = 
totalSegmentsToBeDeleted == 0 ? 0.0 : -1.0;
+        progressStats._averageSegmentSizeInBytes = 
rebalanceContext.getEstimatedAverageSegmentSizeInBytes();
+        progressStats._totalEstimatedDataToBeMovedInBytes =
+            
TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(0,
+                rebalanceContext.getEstimatedAverageSegmentSizeInBytes(), 
totalSegmentsToBeAdded);
+        progressStats._startTimeMs = trigger == 
Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER
+            ? System.currentTimeMillis() : 
rebalanceProgressStats.getStartTimeMs();
+        break;
+      case IDEAL_STATE_CHANGE_TRIGGER:
+      case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:

Review Comment:
   with the two checks on `trigger == Trigger.IDEAL_STATE_CHANGE_TRIGGER` and 
`trigger == Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER` , perhaps 
just split them into two cases



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Calculates the progress stats for the given step or for the overall based 
on the trigger type
+   * @return the calculated step or progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
calculateUpdatedProgressStats(
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> currentAssignment,
+      RebalanceContext rebalanceContext, Trigger trigger, 
TableRebalanceProgressStats rebalanceProgressStats) {
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new 
HashMap<>();
+    Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+    Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+    int totalNewSegmentsNotMonitored = 0;
+    int totalSegmentsTarget = 0;
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+        newSegmentsNotExistingBefore.add(segmentName);
+      }
+      for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+        String instanceName = entry.getKey();
+        String instanceState = entry.getValue();
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          if (newSegmentsNotExistingBefore.contains(segmentName)) {
+            // Don't track newly added segments unless they're on the monitor 
list
+            totalNewSegmentsNotMonitored++;
+            newSegmentsNotExistingBefore.remove(segmentName);
+          }
+          continue;
+        }
+        if 
(instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE))
 {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);
+          continue;
+        }
+        totalSegmentsTarget += 1;

Review Comment:
   nit: ++



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to