somandal commented on code in PR #15266:
URL: https://github.com/apache/pinot/pull/15266#discussion_r2027903591


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Calculates the progress stats for the given step or for the overall based 
on the trigger type
+   * @return the calculated step or progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
calculateUpdatedProgressStats(
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> currentAssignment,
+      RebalanceContext rebalanceContext, Trigger trigger, 
TableRebalanceProgressStats rebalanceProgressStats) {
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new 
HashMap<>();
+    Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+    Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+    int totalNewSegmentsNotMonitored = 0;
+    int totalSegmentsTarget = 0;
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+        newSegmentsNotExistingBefore.add(segmentName);
+      }
+      for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+        String instanceName = entry.getKey();
+        String instanceState = entry.getValue();
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {

Review Comment:
   Have added some comments about this. Basically, the logic in Table 
Rebalancer (`isExternalViewConverged`) only tracks segments if they fall into 
the following conditions:
   - Are part of the `segmentsToMonitor` set, which is basically the list of 
all segments that are moving as part of this rebalance step (or the previous 
one)
   - Segment is not OFFLINE state in IS
   - For `lowDiskMode` if the EV deletion isn't completed yet, etc
   
   Here I also only monitor stats for segments that are on the 
`segmentsToMonitor` list as calculated by table rebalancer. That's why if I see 
a segment is not on the monitor list, I skip processing it for the stats. hope 
that makes sense



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Calculates the progress stats for the given step or for the overall based 
on the trigger type
+   * @return the calculated step or progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
calculateUpdatedProgressStats(
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> currentAssignment,
+      RebalanceContext rebalanceContext, Trigger trigger, 
TableRebalanceProgressStats rebalanceProgressStats) {
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new 
HashMap<>();
+    Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+    Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+    int totalNewSegmentsNotMonitored = 0;
+    int totalSegmentsTarget = 0;
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+        newSegmentsNotExistingBefore.add(segmentName);
+      }
+      for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+        String instanceName = entry.getKey();
+        String instanceState = entry.getValue();
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          if (newSegmentsNotExistingBefore.contains(segmentName)) {
+            // Don't track newly added segments unless they're on the monitor 
list
+            totalNewSegmentsNotMonitored++;
+            newSegmentsNotExistingBefore.remove(segmentName);
+          }
+          continue;
+        }
+        if 
(instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE))
 {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);
+          continue;
+        }
+        totalSegmentsTarget += 1;
+        newServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+      }
+    }
+
+    for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      for (String instanceName : entrySet.getValue().keySet()) {
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          continue;
+        }
+        if (targetInstanceToOfflineSegmentsMap.containsKey(instanceName)
+            && 
targetInstanceToOfflineSegmentsMap.get(instanceName).contains(segmentName)) {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          continue;
+        }
+        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+      }
+    }
+
+    int segmentsNotMoved = 0;
+    int totalSegmentsToBeDeleted = 0;
+    int segmentsUnchangedYetNotConverged = 0;
+    for (Map.Entry<String, Set<String>> entry : 
newServersToSegmentMap.entrySet()) {
+      String server = entry.getKey();
+      Set<String> segmentSet = entry.getValue();
+
+      Set<String> newSegmentSet = new HashSet<>(segmentSet);
+      Set<String> existingSegmentSet = new HashSet<>();
+      int segmentsUnchanged = 0;
+      if (existingServersToSegmentMap.containsKey(server)) {
+        Set<String> segmentSetForServer = 
existingServersToSegmentMap.get(server);
+        existingSegmentSet.addAll(segmentSetForServer);
+        Set<String> intersection = new HashSet<>(segmentSetForServer);
+        intersection.retainAll(newSegmentSet);
+        segmentsUnchanged = intersection.size();
+        segmentsNotMoved += segmentsUnchanged;
+
+        for (String segmentName : intersection) {
+          String currentInstanceState = 
currentAssignment.get(segmentName).get(server);
+          String targetInstanceState = 
targetAssignment.get(segmentName).get(server);
+          if (!currentInstanceState.equals(targetInstanceState)) {
+            segmentsUnchangedYetNotConverged++;
+          }
+        }
+      }
+      newSegmentSet.removeAll(existingSegmentSet);
+      totalSegmentsToBeDeleted += existingSegmentSet.size() - 
segmentsUnchanged;
+    }
+
+    for (Map.Entry<String, Set<String>> entry : 
existingServersToSegmentMap.entrySet()) {
+      if (!newServersToSegmentMap.containsKey(entry.getKey())) {
+        totalSegmentsToBeDeleted += entry.getValue().size();
+      }
+    }
+
+    int newSegsAddedInThisAssignment = 0;
+    int newSegsDeletedInThisAssignment = 0;
+    for (String segment : newSegmentsNotExistingBefore) {
+      Set<String> currentSegmentAssign = currentAssignment.get(segment) != null
+          ? currentAssignment.get(segment).keySet() : new HashSet<>();
+      Set<String> targetSegmentAssign = targetAssignment.get(segment) != null
+          ? targetAssignment.get(segment).keySet() : new HashSet<>();
+
+      Set<String> segmentsAdded = new HashSet<>(targetSegmentAssign);
+      segmentsAdded.removeAll(currentSegmentAssign);
+      newSegsAddedInThisAssignment += segmentsAdded.size();
+
+      Set<String> segmentsDeleted = new HashSet<>(currentSegmentAssign);
+      segmentsDeleted.removeAll(targetSegmentAssign);
+      newSegsDeletedInThisAssignment += segmentsDeleted.size();
+    }
+
+    int newNumberSegmentsTotal = totalSegmentsTarget;
+    int totalSegmentsToBeAdded = newNumberSegmentsTotal - segmentsNotMoved;
+
+    TableRebalanceProgressStats.RebalanceProgressStats progressStats =
+        new TableRebalanceProgressStats.RebalanceProgressStats();
+    switch (trigger) {
+      case START_TRIGGER:
+      case NEXT_ASSINGMENT_CALCULATION_TRIGGER:
+        // These are initialization steps for global / step progress stats
+        progressStats._totalSegmentsToBeAdded = totalSegmentsToBeAdded;
+        progressStats._totalSegmentsToBeDeleted = totalSegmentsToBeDeleted;
+        progressStats._totalRemainingSegmentsToBeAdded = 
totalSegmentsToBeAdded;
+        progressStats._totalRemainingSegmentsToBeDeleted = 
totalSegmentsToBeDeleted;
+        progressStats._totalCarryOverSegmentsToBeAdded = 0;
+        progressStats._totalCarryOverSegmentsToBeDeleted = 0;
+        progressStats._totalRemainingSegmentsToConverge = 
segmentsUnchangedYetNotConverged;
+        progressStats._totalUniqueNewUntrackedSegmentsDuringRebalance = 
totalNewSegmentsNotMonitored;
+        progressStats._percentageRemainingSegmentsToBeAdded = 
totalSegmentsToBeAdded == 0 ? 0.0 : 100.0;
+        progressStats._percentageRemainingSegmentsToBeDeleted = 
totalSegmentsToBeDeleted == 0 ? 0.0 : 100.0;
+        progressStats._estimatedTimeToCompleteAddsInSeconds = 
totalSegmentsToBeAdded == 0 ? 0.0 : -1.0;
+        progressStats._estimatedTimeToCompleteDeletesInSeconds = 
totalSegmentsToBeDeleted == 0 ? 0.0 : -1.0;
+        progressStats._averageSegmentSizeInBytes = 
rebalanceContext.getEstimatedAverageSegmentSizeInBytes();
+        progressStats._totalEstimatedDataToBeMovedInBytes =
+            
TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(0,
+                rebalanceContext.getEstimatedAverageSegmentSizeInBytes(), 
totalSegmentsToBeAdded);
+        progressStats._startTimeMs = trigger == 
Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER
+            ? System.currentTimeMillis() : 
rebalanceProgressStats.getStartTimeMs();
+        break;
+      case IDEAL_STATE_CHANGE_TRIGGER:
+      case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:

Review Comment:
   done



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -284,4 +319,204 @@ public static 
TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetwe
         (totalSegments == 0) ? 0 : ((double) 
rebalanceStats._segmentsToRebalance / totalSegments) * 100.0;
     return rebalanceStats;
   }
+
+  /**
+   * Calculates the progress stats for the given step or for the overall based 
on the trigger type
+   * @return the calculated step or progress stats
+   */
+  @VisibleForTesting
+  static TableRebalanceProgressStats.RebalanceProgressStats 
calculateUpdatedProgressStats(
+      Map<String, Map<String, String>> targetAssignment, Map<String, 
Map<String, String>> currentAssignment,
+      RebalanceContext rebalanceContext, Trigger trigger, 
TableRebalanceProgressStats rebalanceProgressStats) {
+    Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> targetInstanceToOfflineSegmentsMap = new 
HashMap<>();
+    Set<String> newSegmentsNotExistingBefore = new HashSet<>();
+
+    Set<String> segmentsToMonitor = rebalanceContext.getSegmentsToMonitor();
+    int totalNewSegmentsNotMonitored = 0;
+    int totalSegmentsTarget = 0;
+    for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
+      String segmentName = entrySet.getKey();
+      if (!rebalanceContext.getUniqueSegments().contains(segmentName)) {
+        newSegmentsNotExistingBefore.add(segmentName);
+      }
+      for (Map.Entry<String, String> entry : entrySet.getValue().entrySet()) {
+        String instanceName = entry.getKey();
+        String instanceState = entry.getValue();
+        if (segmentsToMonitor != null && 
!segmentsToMonitor.contains(segmentName)) {
+          if (newSegmentsNotExistingBefore.contains(segmentName)) {
+            // Don't track newly added segments unless they're on the monitor 
list
+            totalNewSegmentsNotMonitored++;
+            newSegmentsNotExistingBefore.remove(segmentName);
+          }
+          continue;
+        }
+        if 
(instanceState.equals(CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE))
 {
+          // Skip tracking segments that are in OFFLINE state in the target 
assignment
+          targetInstanceToOfflineSegmentsMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);
+          continue;
+        }
+        totalSegmentsTarget += 1;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to