This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c3e118f1fa Fix up elapsed time calculation for rebalance progress 
stats when carry over are present (#15674)
c3e118f1fa is described below

commit c3e118f1fa737acde84e50b23d8c3c1d1979a5b6
Author: Sonam Mandal <sonam.man...@startree.ai>
AuthorDate: Wed Apr 30 22:25:12 2025 -0700

    Fix up elapsed time calculation for rebalance progress stats when carry 
over are present (#15674)
    
    * Fix up elapsed time calculation for rebalance progress stats when carry 
over
    
    * Fix extra commented line
---
 .../rebalance/TableRebalanceProgressStats.java     |   4 +-
 .../rebalance/ZkBasedTableRebalanceObserver.java   |   4 +-
 .../TestZkBasedTableRebalanceObserver.java         | 125 +++++++++++++++++++++
 3 files changed, 130 insertions(+), 3 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
index 050052e83a..47d223804f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java
@@ -242,7 +242,9 @@ public class TableRebalanceProgressStats {
       int remainingSegmentsToChange) {
     double elapsedTimeInSeconds = (double) (System.currentTimeMillis() - 
startTime) / 1000.0;
     int segmentsAlreadyChanged = totalSegmentsToChange - 
remainingSegmentsToChange;
-    return segmentsAlreadyChanged == 0 ? totalSegmentsToChange == 0 ? 0.0 : 
-1.0
+    // If carry over + remaining segments to change are > total segments to 
change then number of segments already
+    // changed may be -ve, in which case we should just set the default value 
as we cannot measure elapsed time
+    return segmentsAlreadyChanged <= 0 ? totalSegmentsToChange == 0 ? 0.0 : 
-1.0
         : (double) remainingSegmentsToChange / (double) segmentsAlreadyChanged 
* elapsedTimeInSeconds;
   }
 
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 125b0568bc..5eaa80e91d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -581,10 +581,10 @@ public class ZkBasedTableRebalanceObserver implements 
TableRebalanceObserver {
         startTimeMs = existingProgressStats._startTimeMs;
         progressStats._estimatedTimeToCompleteAddsInSeconds =
             
TableRebalanceProgressStats.calculateEstimatedTimeToCompleteChange(startTimeMs,
-                progressStats._totalSegmentsToBeAdded, 
progressStats._totalRemainingSegmentsToBeAdded);
+                progressStats._totalSegmentsToBeAdded, totalSegmentsToBeAdded);
         progressStats._estimatedTimeToCompleteDeletesInSeconds =
             
TableRebalanceProgressStats.calculateEstimatedTimeToCompleteChange(startTimeMs,
-                progressStats._totalSegmentsToBeDeleted, 
progressStats._totalRemainingSegmentsToBeDeleted);
+                progressStats._totalSegmentsToBeDeleted, 
totalSegmentsToBeDeleted);
         progressStats._averageSegmentSizeInBytes = 
existingProgressStats._averageSegmentSizeInBytes;
         progressStats._totalEstimatedDataToBeMovedInBytes =
             
TableRebalanceProgressStats.calculateNewEstimatedDataToBeMovedInBytes(
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index 0d2f395d79..7ae66a7b30 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -1293,4 +1293,129 @@ public class TestZkBasedTableRebalanceObserver {
 
     assertEquals(current, target);
   }
+
+  @Test
+  void testElapsedTimeWithCarryOverProgressStats() {
+    long estimatedAverageSegmentSize = 1024;
+
+    Map<String, Map<String, String>> currentIS = new TreeMap<>();
+    currentIS.put("segment1", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1"), ONLINE));
+    currentIS.put("segment2", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2"), ONLINE));
+    currentIS.put("segment3", 
SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host4"), ONLINE));
+
+    // Assume that the current EV doesn't yet have the segments added as seen 
in currentIS
+    Map<String, Map<String, String>> currentEV = new TreeMap<>();
+
+    Map<String, Map<String, String>> targetIS = new TreeMap<>();
+    targetIS.put("segment1",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host2", 
"host3", "host4", "host5"), ONLINE));
+    targetIS.put("segment2",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host4", "host5"), ONLINE));
+    targetIS.put("segment3",
+        SegmentAssignmentUtils.getInstanceStateMap(Arrays.asList("host1", 
"host3", "host4", "host5"), ONLINE));
+
+    TableRebalanceProgressStats tableRebalanceProgressStats = new 
TableRebalanceProgressStats();
+
+    // Initialize the start trigger with some change
+    Set<String> segmentSet = new HashSet<>(targetIS.keySet());
+    TableRebalanceObserver.RebalanceContext rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(
+        estimatedAverageSegmentSize, segmentSet, segmentSet);
+    TableRebalanceProgressStats.RebalanceProgressStats stats =
+        ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(targetIS, 
currentIS, rebalanceContext,
+            TableRebalanceObserver.Trigger.START_TRIGGER, 
tableRebalanceProgressStats);
+    assertEquals(stats._totalSegmentsToBeAdded, 11);
+    assertEquals(stats._totalSegmentsToBeDeleted, 2);
+    assertEquals(stats._totalRemainingSegmentsToBeAdded, 11);
+    assertEquals(stats._totalRemainingSegmentsToBeDeleted, 2);
+    assertEquals(stats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(stats._totalRemainingSegmentsToConverge, 0);
+    assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0);
+    assertEquals(stats._percentageRemainingSegmentsToBeAdded, 100.0);
+    assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 100.0);
+    assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, -1.0);
+    assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, -1.0);
+    assertEquals(stats._averageSegmentSizeInBytes, 
estimatedAverageSegmentSize);
+    assertEquals(stats._totalEstimatedDataToBeMovedInBytes, 
estimatedAverageSegmentSize * 11);
+    tableRebalanceProgressStats.setRebalanceProgressStatsOverall(stats);
+
+    // Next call EV-IS convergence (assume here that the IS is not yet updated 
like happens in actual rebalance)
+    // Since currentEV does not match currentIS, IS-EV convergence will detect 
some segments are carry over segments
+    // and wait for these to converge before moving on to the first IS update
+    // Also validate that the overall progress stats shows elapsed time as 
-1.0 instead of some random -ve time
+    segmentSet = new HashSet<>(targetIS.keySet());
+    rebalanceContext = new 
TableRebalanceObserver.RebalanceContext(estimatedAverageSegmentSize, 
segmentSet, segmentSet);
+    stats = 
ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(currentIS, 
currentEV, rebalanceContext,
+        
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+        tableRebalanceProgressStats);
+    
tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(stats);
+    stats = tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep();
+    assertEquals(stats._totalSegmentsToBeAdded, 0);
+    assertEquals(stats._totalSegmentsToBeDeleted, 0);
+    assertEquals(stats._totalRemainingSegmentsToBeAdded, 0);
+    assertEquals(stats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(stats._totalCarryOverSegmentsToBeAdded, 3);
+    assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(stats._totalRemainingSegmentsToConverge, 0);
+    assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0);
+    assertEquals(stats._percentageRemainingSegmentsToBeAdded, 0.0);
+    assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 0.0);
+    assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, 0.0);
+    assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, 0.0);
+    assertEquals(stats._averageSegmentSizeInBytes, 0);
+    assertEquals(stats._totalEstimatedDataToBeMovedInBytes, 0);
+    TableRebalanceProgressStats.RebalanceProgressStats overallStats =
+        tableRebalanceProgressStats.getRebalanceProgressStatsOverall();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 11);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 2);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 11);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 2);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 3);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToConverge, 0);
+    assertEquals(overallStats._totalUniqueNewUntrackedSegmentsDuringRebalance, 
0);
+    assertEquals(overallStats._percentageRemainingSegmentsToBeAdded, 
127.27272727272727);
+    assertEquals(overallStats._percentageRemainingSegmentsToBeDeleted, 100.0);
+    assertEquals(overallStats._estimatedTimeToCompleteAddsInSeconds, -1.0);
+    assertEquals(overallStats._estimatedTimeToCompleteDeletesInSeconds, -1.0);
+    assertEquals(overallStats._averageSegmentSizeInBytes, 
estimatedAverageSegmentSize);
+    assertEquals(overallStats._totalEstimatedDataToBeMovedInBytes, 
estimatedAverageSegmentSize * 11);
+
+    // currentEV has converged to match currentIS, no more carry over segments 
should be seen
+    currentEV = new TreeMap<>(currentIS);
+    stats = 
ZkBasedTableRebalanceObserver.calculateUpdatedProgressStats(currentIS, 
currentEV, rebalanceContext,
+        
TableRebalanceObserver.Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+        tableRebalanceProgressStats);
+    
tableRebalanceProgressStats.updateOverallAndStepStatsFromLatestStepStats(stats);
+    stats = tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep();
+    assertEquals(stats._totalSegmentsToBeAdded, 0);
+    assertEquals(stats._totalSegmentsToBeDeleted, 0);
+    assertEquals(stats._totalRemainingSegmentsToBeAdded, 0);
+    assertEquals(stats._totalRemainingSegmentsToBeDeleted, 0);
+    assertEquals(stats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(stats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(stats._totalRemainingSegmentsToConverge, 0);
+    assertEquals(stats._totalUniqueNewUntrackedSegmentsDuringRebalance, 0);
+    assertEquals(stats._percentageRemainingSegmentsToBeAdded, 0.0);
+    assertEquals(stats._percentageRemainingSegmentsToBeDeleted, 0.0);
+    assertEquals(stats._estimatedTimeToCompleteAddsInSeconds, 0.0);
+    assertEquals(stats._estimatedTimeToCompleteDeletesInSeconds, 0.0);
+    assertEquals(stats._averageSegmentSizeInBytes, 0);
+    assertEquals(stats._totalEstimatedDataToBeMovedInBytes, 0);
+    overallStats = 
tableRebalanceProgressStats.getRebalanceProgressStatsOverall();
+    assertEquals(overallStats._totalSegmentsToBeAdded, 11);
+    assertEquals(overallStats._totalSegmentsToBeDeleted, 2);
+    assertEquals(overallStats._totalRemainingSegmentsToBeAdded, 11);
+    assertEquals(overallStats._totalRemainingSegmentsToBeDeleted, 2);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeAdded, 0);
+    assertEquals(overallStats._totalCarryOverSegmentsToBeDeleted, 0);
+    assertEquals(overallStats._totalRemainingSegmentsToConverge, 0);
+    assertEquals(overallStats._totalUniqueNewUntrackedSegmentsDuringRebalance, 
0);
+    assertEquals(overallStats._percentageRemainingSegmentsToBeAdded, 100.0);
+    assertEquals(overallStats._percentageRemainingSegmentsToBeDeleted, 100.0);
+    assertEquals(overallStats._estimatedTimeToCompleteAddsInSeconds, -1.0);
+    assertEquals(overallStats._estimatedTimeToCompleteDeletesInSeconds, -1.0);
+    assertEquals(overallStats._averageSegmentSizeInBytes, 
estimatedAverageSegmentSize);
+    assertEquals(overallStats._totalEstimatedDataToBeMovedInBytes, 
estimatedAverageSegmentSize * 11);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to