somandal commented on code in PR #15266:
URL: https://github.com/apache/pinot/pull/15266#discussion_r2029417170


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,253 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Calculates the progress stats for the given step or for the overall based 
on the trigger type
+   * @param targetAssignment target assignment (either updated IS or the 
target end IS depending on the step)
+   * @param currentAssignment current assignment (either EV or the current IS 
depending on the step)
+   * @param rebalanceContext rebalance context
+   * @param trigger reason to trigger the stats update
+   * @param rebalanceProgressStats current value of the rebalance progress 
stats, used to calculate the next
+   * @return the calculated step or progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
calculateUpdatedProgressStats(
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> currentAssignment,
+      RebalanceContext rebalanceContext, Trigger trigger, 
TableRebalanceProgressStats rebalanceProgressStats) {
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new 
HashMap<>();
+    Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+    // Segments to monitor is the list of segments that are being moved as 
part of the table rebalance that the
+    // table rebalance intends to track convergence for. This list usually 
includes segments from the last next
+    // assignment IS update and the current IS update. The EV-IS convergence 
check only tracks convergence for the
+    // segments on this list, and if any additional segments are found that 
haven't converged they are ignored.
+    // From the stats perspective, we also only care about tracking the 
convergence of actual segments that the
+    // rebalance cares about, that is why we skip any segments that are not on 
this list.
+    Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+    int totalNewSegmentsNotMonitored = 0;
+    int totalSegmentsTarget = 0;
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+        newSegmentsNotExistingBefore.add(segmentName);
+      }
+      for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+        String instanceName = entry.getKey();
+        String instanceState = entry.getValue();
+        // Don't track segments that are not on the rebalance monitor list
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          if (newSegmentsNotExistingBefore.contains(segmentName)) {
+            // Don't track newly added segments unless they're on the monitor 
list, remove them if they were added
+            // before
+            totalNewSegmentsNotMonitored++;
+            newSegmentsNotExistingBefore.remove(segmentName);
+          }
+          continue;
+        }
+        if 
(instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE))
 {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);
+          continue;
+        }
+        totalSegmentsTarget++;
+        newServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      for (String instanceName : entrySet.getValue().keySet()) {
+        // Don't track segments that are not on the rebalance monitor list
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          continue;
+        }
+        if (targetInstanceToOfflineSegmentsMap.containsKey(instanceName)
+            && 
targetInstanceToOfflineSegmentsMap.get(instanceName).contains(segmentName)) {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          continue;
+        }
+        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+      }
+    }
+
+    int segmentsNotMoved = 0;
+    int totalSegmentsToBeDeleted = 0;
+    int segmentsUnchangedYetNotConverged = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentSet = entry.getValue();
+
+      Set<String> newSegmentSet = new HashSet<>(segmentSet);
+      Set<String> existingSegmentSet = new HashSet<>();
+      int segmentsUnchanged = 0;
+      if (existingServersToSegmentMap.containsKey(server)) {
+        Set<String> segmentSetForServer = 
existingServersToSegmentMap.get(server);
+        existingSegmentSet.addAll(segmentSetForServer);
+        Set<String> intersection = new HashSet<>(segmentSetForServer);
+        intersection.retainAll(newSegmentSet);
+        segmentsUnchanged = intersection.size();
+        segmentsNotMoved += segmentsUnchanged;
+
+        for (String segmentName : intersection) {
+          String currentInstanceState = 
currentAssignment.get(segmentName).get(server);
+          String targetInstanceState = 
targetAssignment.get(segmentName).get(server);
+          if (!currentInstanceState.equals(targetInstanceState)) {
+            segmentsUnchangedYetNotConverged++;
+          }
+        }
+      }
+      newSegmentSet.removeAll(existingSegmentSet);
+      totalSegmentsToBeDeleted += existingSegmentSet.size() - 
segmentsUnchanged;
+    }
+
+    for (Map.Entry<String, Set<String>> entry : 
existingServersToSegmentMap.entrySet()) {
+      if (!newServersToSegmentMap.containsKey(entry.getKey())) {
+        totalSegmentsToBeDeleted += entry.getValue().size();
+      }
+    }
+
+    int newSegsAddedInThisAssignment = 0;
+    int newSegsDeletedInThisAssignment = 0;
+    for (String segment : newSegmentsNotExistingBefore) {
+      Set<String> currentSegmentAssign = currentAssignment.get(segment) != null
+          ? currentAssignment.get(segment).keySet() : new HashSet<>();
+      Set<String> targetSegmentAssign = targetAssignment.get(segment) != null
+          ? targetAssignment.get(segment).keySet() : new HashSet<>();
+
+      Set<String> segmentsAdded = new HashSet<>(targetSegmentAssign);
+      segmentsAdded.removeAll(currentSegmentAssign);
+      newSegsAddedInThisAssignment += segmentsAdded.size();
+
+      Set<String> segmentsDeleted = new HashSet<>(currentSegmentAssign);
+      segmentsDeleted.removeAll(targetSegmentAssign);
+      newSegsDeletedInThisAssignment += segmentsDeleted.size();
+    }
+
+    int newNumberSegmentsTotal = totalSegmentsTarget;
+    int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved;
+
+    TableRebalanceProgressStats.RebalanceProgressStats existingProgressStats;
+    long startTimeMs;
+    TableRebalanceProgressStats.RebalanceProgressStats progressStats =
+        new TableRebalanceProgressStats.RebalanceProgressStats();
+    switch (trigger) {
+      case START_TRIGGER:
+      case NEXT_ASSINGMENT_CALCULATION_TRIGGER:
+        // These are initialization steps for global / step progress stats
+        progressStats._totalSegmentsToBeAdded = totalSegmentsToBeAdded;
+        progressStats._totalSegmentsToBeDeleted = totalSegmentsToBeDeleted;
+        progressStats._totalRemainingSegmentsToBeAdded = 
totalSegmentsToBeAdded;
+        progressStats._totalRemainingSegmentsToBeDeleted = 
totalSegmentsToBeDeleted;
+        progressStats._totalCarryOverSegmentsToBeAdded = 0;
+        progressStats._totalCarryOverSegmentsToBeDeleted = 0;
+        progressStats._totalRemainingSegmentsToConverge = 
segmentsUnchangedYetNotConverged;
+        progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = 
totalNewSegmentsNotMonitored;
+        progressStats._percentageRemainingSegmentsToBeAdded = 
totalSegmentsToBeAdded == 0 ? 0.0 : 100.0;
+        progressStats._percentageRemainingSegmentsToBeDeleted = 
totalSegmentsToBeDeleted == 0 ? 0.0 : 100.0;
+        progressStats._estimatedTimeToCompleteAddsInSeconds = 
totalSegmentsToBeAdded == 0 ? 0.0 : -1.0;
+        progressStats._estimatedTimeToCompleteDeletesInSeconds = 
totalSegmentsToBeDeleted == 0 ? 0.0 : -1.0;
+        progressStats._averageSegmentSizeInBytes = 
rebalanceContext.getEstimatedAverageSegmentSizeInBytes();
+        progressStats._totalEstimatedDataToBeMovedInBytes =
+            
TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(0,
+                rebalanceContext.getEstimatedAverageSegmentSizeInBytes(), 
totalSegmentsToBeAdded);
+        progressStats._startTimeMs = trigger == 
Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER
+            ? System.currentTimeMillis() : 
rebalanceProgressStats.getStartTimeMs();
+        break;
+      case IDEAL_STATE_CHANGE_TRIGGER:
+        existingProgressStats = 
rebalanceProgressStats.getRebalanceProgressStatsOverall();
+        progressStats._totalSegmentsToBeAdded =
+            existingProgressStats._totalSegmentsToBeAdded + 
newSegsAddedInThisAssignment;
+        progressStats._totalSegmentsToBeDeleted =
+            existingProgressStats._totalSegmentsToBeDeleted + 
newSegsDeletedInThisAssignment;
+        progressStats._totalRemainingSegmentsToBeAdded = 
totalSegmentsToBeAdded;
+        progressStats._totalRemainingSegmentsToBeDeleted = 
totalSegmentsToBeDeleted;
+        progressStats._totalCarryOverSegmentsToBeAdded = 0;
+        progressStats._totalCarryOverSegmentsToBeDeleted = 0;
+        progressStats._totalRemainingSegmentsToConverge = 
segmentsUnchangedYetNotConverged;
+        progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance =
+            
existingProgressStats._totalUniqueNewUntrackedSegmentsDuringRebalance + 
totalNewSegmentsNotMonitored;

Review Comment:
   `segmentsToMonitor` is not a full list of all segments that were monitored 
by the TableRebalancer from start to end. It only captures the list of segments 
from the current step and the last step
   
   What's error prone here, can you elaborate on that?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to