raghavyadav01 commented on code in PR #15266:
URL: https://github.com/apache/pinot/pull/15266#discussion_r2021773303


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -18,35 +18,28 @@
  */
 package org.apache.pinot.controller.helix.core.rebalance;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 
 
 /**
  * These are rebalance stats as to how the current state is, when compared to 
the target state.
  * Eg: If the current has 4 segments whose replicas (16) don't match the 
target state, _segmentsToRebalance
  * is 4 and _replicasToRebalance is 16.
  */
+@JsonIgnoreProperties(ignoreUnknown = true)

Review Comment:
   Is it safe to ignore Unknown? Should this be an error ? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -632,15 +651,15 @@ private RebalanceSummaryResult 
calculateDryRunSummary(Map<String, Map<String, St
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
       existingReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      for (String instanceName : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(entrySet.getKey());
       }
     }
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
       newReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      for (String instanceName : entrySet.getValue().keySet()) {

Review Comment:
   Why change the variable name? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -68,31 +70,67 @@ public ZkBasedTableRebalanceObserver(String 
tableNameWithType, String rebalanceJ
 
   @Override
   public void onTrigger(Trigger trigger, Map<String, Map<String, String>> 
currentState,
-      Map<String, Map<String, String>> targetState) {
+      Map<String, Map<String, String>> targetState, RebalanceContext 
rebalanceContext) {
     boolean updatedStatsInZk = false;
     _controllerMetrics.setValueOfTableGauge(_tableNameWithType, 
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1);
+    TableRebalanceProgressStats.RebalanceStateStats latest;
+    TableRebalanceProgressStats.RebalanceProgressStats latestProgress;
     switch (trigger) {
       case START_TRIGGER:
-        updateOnStart(currentState, targetState);
+        updateOnStart(currentState, targetState, rebalanceContext);
         trackStatsInZk();
         updatedStatsInZk = true;
         break;
       // Write to Zk if there's change since previous stats computation
       case IDEAL_STATE_CHANGE_TRIGGER:
-        TableRebalanceProgressStats.RebalanceStateStats latest =
-            getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        latestProgress = calculateOverallProgressStats(targetState,
+            currentState, rebalanceContext, 
Trigger.IDEAL_STATE_CHANGE_TRIGGER, _tableRebalanceProgressStats);
         if 
(TableRebalanceProgressStats.statsDiffer(_tableRebalanceProgressStats.getCurrentToTargetConvergence(),
-            latest)) {
-          _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
+            latest) || TableRebalanceProgressStats.progressStatsDiffer(
+            _tableRebalanceProgressStats.getRebalanceProgressStatsOverall(), 
latestProgress)) {
+          if (TableRebalanceProgressStats.statsDiffer(
+              
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
+            _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
+          }
+          if (TableRebalanceProgressStats.progressStatsDiffer(
+              _tableRebalanceProgressStats.getRebalanceProgressStatsOverall(), 
latestProgress)) {
+            
_tableRebalanceProgressStats.setRebalanceProgressStatsOverall(latestProgress);
+          }
           trackStatsInZk();
           updatedStatsInZk = true;
         }
         break;
       case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
         latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        latestProgress = calculateOverallProgressStats(targetState,
+            currentState, rebalanceContext, 
Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER,
+            _tableRebalanceProgressStats);
         if (TableRebalanceProgressStats.statsDiffer(
-            
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
-          
_tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
+            
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)
+            || TableRebalanceProgressStats.progressStatsDiffer(
+                
_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep(), 
latestProgress)) {
+          if (TableRebalanceProgressStats.statsDiffer(
+              
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
+            
_tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
+          }
+          TableRebalanceProgressStats.RebalanceProgressStats lastStepStats =
+              
_tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep();
+          if (TableRebalanceProgressStats.progressStatsDiffer(lastStepStats, 
latestProgress)) {
+            _tableRebalanceProgressStats.setRebalanceProgressStatsOverall(
+                
updateOverallProgressStatsFromStep(_tableRebalanceProgressStats, lastStepStats, 
latestProgress));
+            
_tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(latestProgress);
+          }
+          trackStatsInZk();
+          updatedStatsInZk = true;
+        }
+        break;
+      case NEXT_ASSINGMENT_CALCULATION_TRIGGER:
+        latestProgress = calculateOverallProgressStats(targetState,

Review Comment:
   What is the impact of errors in trigger on stats calculation?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -68,31 +70,67 @@ public ZkBasedTableRebalanceObserver(String 
tableNameWithType, String rebalanceJ
 
   @Override
   public void onTrigger(Trigger trigger, Map<String, Map<String, String>> 
currentState,
-      Map<String, Map<String, String>> targetState) {
+      Map<String, Map<String, String>> targetState, RebalanceContext 
rebalanceContext) {
     boolean updatedStatsInZk = false;
     _controllerMetrics.setValueOfTableGauge(_tableNameWithType, 
ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1);
+    TableRebalanceProgressStats.RebalanceStateStats latest;
+    TableRebalanceProgressStats.RebalanceProgressStats latestProgress;
     switch (trigger) {
       case START_TRIGGER:
-        updateOnStart(currentState, targetState);
+        updateOnStart(currentState, targetState, rebalanceContext);
         trackStatsInZk();
         updatedStatsInZk = true;
         break;
       // Write to Zk if there's change since previous stats computation
       case IDEAL_STATE_CHANGE_TRIGGER:
-        TableRebalanceProgressStats.RebalanceStateStats latest =
-            getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        latestProgress = calculateOverallProgressStats(targetState,
+            currentState, rebalanceContext, 
Trigger.IDEAL_STATE_CHANGE_TRIGGER, _tableRebalanceProgressStats);
         if 
(TableRebalanceProgressStats.statsDiffer(_tableRebalanceProgressStats.getCurrentToTargetConvergence(),
-            latest)) {
-          _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
+            latest) || TableRebalanceProgressStats.progressStatsDiffer(
+            _tableRebalanceProgressStats.getRebalanceProgressStatsOverall(), 
latestProgress)) {
+          if (TableRebalanceProgressStats.statsDiffer(
+              
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
+            _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);

Review Comment:
   Can the stats regressed? If yes, is there a way to detetc?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -987,8 +1010,9 @@ private Pair<InstancePartitions, Boolean> 
getInstancePartitionsForTier(TableConf
     }
   }
 
-  private IdealState waitForExternalViewToConverge(String tableNameWithType, 
boolean lowDiskMode, boolean bestEfforts,
-      Set<String> segmentsToMonitor, long externalViewCheckIntervalInMs, long 
externalViewStabilizationTimeoutInMs)
+  private IdealState waitForExternalViewToConverge(String tableNameWithType, 
boolean bestEfforts,

Review Comment:
   Will the changes cause any issue in rolling upgrade? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -118,13 +129,107 @@ public RebalanceStateStats 
getCurrentToTargetConvergence() {
     return _currentToTargetConvergence;
   }
 
+  public RebalanceProgressStats getRebalanceProgressStatsOverall() {
+    return _rebalanceProgressStatsOverall;
+  }
+
+  public RebalanceProgressStats getRebalanceProgressStatsCurrentStep() {
+    return _rebalanceProgressStatsCurrentStep;
+  }
+
+  public static boolean progressStatsDiffer(RebalanceProgressStats base, 
RebalanceProgressStats compare) {
+    // Don't check for changes in the estimated time for completion as this 
will always be updated since it is
+    // newly calculated for each iteration based on current time and start time
+    return base._totalSegmentsToBeAdded != compare._totalSegmentsToBeAdded
+        || base._totalSegmentsToBeDeleted != compare._totalSegmentsToBeDeleted
+        || base._totalRemainingSegmentsToBeAdded != 
compare._totalRemainingSegmentsToBeAdded
+        || base._totalRemainingSegmentsToBeDeleted != 
compare._totalRemainingSegmentsToBeDeleted
+        || base._totalRemainingSegmentsToConverge != 
compare._totalRemainingSegmentsToConverge
+        || base._totalUniqueNewUntrackedSegmentsDuringRebalance
+        != compare._totalUniqueNewUntrackedSegmentsDuringRebalance
+        || base._percentageTotalSegmentsAddsRemaining != 
compare._percentageTotalSegmentsAddsRemaining
+        || base._percentageTotalSegmentDeletesRemaining != 
compare._percentageTotalSegmentDeletesRemaining
+        || base._averageSegmentSizeInBytes != 
compare._averageSegmentSizeInBytes
+        || base._totalEstimatedDataToBeMovedInBytes != 
compare._totalEstimatedDataToBeMovedInBytes

Review Comment:
   Can estimated Data to be moved vary as well similar to estimated time? 



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -567,6 +577,15 @@ private RebalanceResult doRebalance(TableConfig 
tableConfig, RebalanceConfig reb
               + "added/removed for each instance: {}", rebalanceJobId, 
tableNameWithType,
           
SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, 
nextAssignment));
 
+      // Record change of current ideal state and the next assignment
+      
_tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER,
+          currentAssignment, nextAssignment, rebalanceContext);
+      if (_tableRebalanceObserver.isStopped()) {

Review Comment:
   In what scenario, table rebalance would stop before IS update? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to