raghavyadav01 commented on code in PR #15266: URL: https://github.com/apache/pinot/pull/15266#discussion_r2021773303
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java: ########## @@ -18,35 +18,28 @@ */ package org.apache.pinot.controller.helix.core.rebalance; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; /** * These are rebalance stats as to how the current state is, when compared to the target state. * Eg: If the current has 4 segments whose replicas (16) don't match the target state, _segmentsToRebalance * is 4 and _replicasToRebalance is 16. */ +@JsonIgnoreProperties(ignoreUnknown = true) Review Comment: Is it safe to ignore Unknown? Should this be an error ? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -632,15 +651,15 @@ private RebalanceSummaryResult calculateDryRunSummary(Map<String, Map<String, St for (Map.Entry<String, Map<String, String>> entrySet : currentAssignment.entrySet()) { existingReplicationFactor = entrySet.getValue().size(); - for (String segmentKey : entrySet.getValue().keySet()) { - existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + for (String instanceName : entrySet.getValue().keySet()) { + existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new HashSet<>()).add(entrySet.getKey()); } } for (Map.Entry<String, Map<String, String>> entrySet : targetAssignment.entrySet()) { newReplicationFactor = entrySet.getValue().size(); - for (String segmentKey : entrySet.getValue().keySet()) { - newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new HashSet<>()).add(entrySet.getKey()); + for (String instanceName : entrySet.getValue().keySet()) { Review Comment: Why change the variable name? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -68,31 +70,67 @@ public ZkBasedTableRebalanceObserver(String tableNameWithType, String rebalanceJ @Override public void onTrigger(Trigger trigger, Map<String, Map<String, String>> currentState, - Map<String, Map<String, String>> targetState) { + Map<String, Map<String, String>> targetState, RebalanceContext rebalanceContext) { boolean updatedStatsInZk = false; _controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1); + TableRebalanceProgressStats.RebalanceStateStats latest; + TableRebalanceProgressStats.RebalanceProgressStats latestProgress; switch (trigger) { case START_TRIGGER: - updateOnStart(currentState, targetState); + updateOnStart(currentState, targetState, rebalanceContext); trackStatsInZk(); updatedStatsInZk = true; break; // Write to Zk if there's change since previous stats computation case IDEAL_STATE_CHANGE_TRIGGER: - TableRebalanceProgressStats.RebalanceStateStats latest = - getDifferenceBetweenTableRebalanceStates(targetState, currentState); + latest = getDifferenceBetweenTableRebalanceStates(targetState, currentState); + latestProgress = calculateOverallProgressStats(targetState, + currentState, rebalanceContext, Trigger.IDEAL_STATE_CHANGE_TRIGGER, _tableRebalanceProgressStats); if (TableRebalanceProgressStats.statsDiffer(_tableRebalanceProgressStats.getCurrentToTargetConvergence(), - latest)) { - _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest); + latest) || TableRebalanceProgressStats.progressStatsDiffer( + _tableRebalanceProgressStats.getRebalanceProgressStatsOverall(), latestProgress)) { + if (TableRebalanceProgressStats.statsDiffer( + _tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) { + _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest); + } + if (TableRebalanceProgressStats.progressStatsDiffer( + _tableRebalanceProgressStats.getRebalanceProgressStatsOverall(), latestProgress)) { + _tableRebalanceProgressStats.setRebalanceProgressStatsOverall(latestProgress); + } trackStatsInZk(); updatedStatsInZk = true; } break; case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER: latest = getDifferenceBetweenTableRebalanceStates(targetState, currentState); + latestProgress = calculateOverallProgressStats(targetState, + currentState, rebalanceContext, Trigger.EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER, + _tableRebalanceProgressStats); if (TableRebalanceProgressStats.statsDiffer( - _tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) { - _tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest); + _tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest) + || TableRebalanceProgressStats.progressStatsDiffer( + _tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep(), latestProgress)) { + if (TableRebalanceProgressStats.statsDiffer( + _tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) { + _tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest); + } + TableRebalanceProgressStats.RebalanceProgressStats lastStepStats = + _tableRebalanceProgressStats.getRebalanceProgressStatsCurrentStep(); + if (TableRebalanceProgressStats.progressStatsDiffer(lastStepStats, latestProgress)) { + _tableRebalanceProgressStats.setRebalanceProgressStatsOverall( + updateOverallProgressStatsFromStep(_tableRebalanceProgressStats, lastStepStats, latestProgress)); + _tableRebalanceProgressStats.setRebalanceProgressStatsCurrentStep(latestProgress); + } + trackStatsInZk(); + updatedStatsInZk = true; + } + break; + case NEXT_ASSINGMENT_CALCULATION_TRIGGER: + latestProgress = calculateOverallProgressStats(targetState, Review Comment: What is the impact of errors in trigger on stats calculation? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java: ########## @@ -68,31 +70,67 @@ public ZkBasedTableRebalanceObserver(String tableNameWithType, String rebalanceJ @Override public void onTrigger(Trigger trigger, Map<String, Map<String, String>> currentState, - Map<String, Map<String, String>> targetState) { + Map<String, Map<String, String>> targetState, RebalanceContext rebalanceContext) { boolean updatedStatsInZk = false; _controllerMetrics.setValueOfTableGauge(_tableNameWithType, ControllerGauge.TABLE_REBALANCE_IN_PROGRESS, 1); + TableRebalanceProgressStats.RebalanceStateStats latest; + TableRebalanceProgressStats.RebalanceProgressStats latestProgress; switch (trigger) { case START_TRIGGER: - updateOnStart(currentState, targetState); + updateOnStart(currentState, targetState, rebalanceContext); trackStatsInZk(); updatedStatsInZk = true; break; // Write to Zk if there's change since previous stats computation case IDEAL_STATE_CHANGE_TRIGGER: - TableRebalanceProgressStats.RebalanceStateStats latest = - getDifferenceBetweenTableRebalanceStates(targetState, currentState); + latest = getDifferenceBetweenTableRebalanceStates(targetState, currentState); + latestProgress = calculateOverallProgressStats(targetState, + currentState, rebalanceContext, Trigger.IDEAL_STATE_CHANGE_TRIGGER, _tableRebalanceProgressStats); if (TableRebalanceProgressStats.statsDiffer(_tableRebalanceProgressStats.getCurrentToTargetConvergence(), - latest)) { - _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest); + latest) || TableRebalanceProgressStats.progressStatsDiffer( + _tableRebalanceProgressStats.getRebalanceProgressStatsOverall(), latestProgress)) { + if (TableRebalanceProgressStats.statsDiffer( + _tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) { + _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest); Review Comment: Can the stats regressed? If yes, is there a way to detetc? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -987,8 +1010,9 @@ private Pair<InstancePartitions, Boolean> getInstancePartitionsForTier(TableConf } } - private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean lowDiskMode, boolean bestEfforts, - Set<String> segmentsToMonitor, long externalViewCheckIntervalInMs, long externalViewStabilizationTimeoutInMs) + private IdealState waitForExternalViewToConverge(String tableNameWithType, boolean bestEfforts, Review Comment: Will the changes cause any issue in rolling upgrade? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java: ########## @@ -118,13 +129,107 @@ public RebalanceStateStats getCurrentToTargetConvergence() { return _currentToTargetConvergence; } + public RebalanceProgressStats getRebalanceProgressStatsOverall() { + return _rebalanceProgressStatsOverall; + } + + public RebalanceProgressStats getRebalanceProgressStatsCurrentStep() { + return _rebalanceProgressStatsCurrentStep; + } + + public static boolean progressStatsDiffer(RebalanceProgressStats base, RebalanceProgressStats compare) { + // Don't check for changes in the estimated time for completion as this will always be updated since it is + // newly calculated for each iteration based on current time and start time + return base._totalSegmentsToBeAdded != compare._totalSegmentsToBeAdded + || base._totalSegmentsToBeDeleted != compare._totalSegmentsToBeDeleted + || base._totalRemainingSegmentsToBeAdded != compare._totalRemainingSegmentsToBeAdded + || base._totalRemainingSegmentsToBeDeleted != compare._totalRemainingSegmentsToBeDeleted + || base._totalRemainingSegmentsToConverge != compare._totalRemainingSegmentsToConverge + || base._totalUniqueNewUntrackedSegmentsDuringRebalance + != compare._totalUniqueNewUntrackedSegmentsDuringRebalance + || base._percentageTotalSegmentsAddsRemaining != compare._percentageTotalSegmentsAddsRemaining + || base._percentageTotalSegmentDeletesRemaining != compare._percentageTotalSegmentDeletesRemaining + || base._averageSegmentSizeInBytes != compare._averageSegmentSizeInBytes + || base._totalEstimatedDataToBeMovedInBytes != compare._totalEstimatedDataToBeMovedInBytes Review Comment: Can estimated Data to be moved vary as well similar to estimated time? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java: ########## @@ -567,6 +577,15 @@ private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig reb + "added/removed for each instance: {}", rebalanceJobId, tableNameWithType, SegmentAssignmentUtils.getNumSegmentsToMovePerInstance(currentAssignment, nextAssignment)); + // Record change of current ideal state and the next assignment + _tableRebalanceObserver.onTrigger(TableRebalanceObserver.Trigger.NEXT_ASSINGMENT_CALCULATION_TRIGGER, + currentAssignment, nextAssignment, rebalanceContext); + if (_tableRebalanceObserver.isStopped()) { Review Comment: In what scenario, table rebalance would stop before IS update? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org