Jackie-Jiang commented on code in PR #10359:
URL: https://github.com/apache/pinot/pull/10359#discussion_r1139900620


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java:
##########
@@ -25,6 +25,12 @@ public class RebalanceConfigConstants {
   private RebalanceConfigConstants() {
   }
 
+  // Unique Id for rebalance
+  public static final String JOB_ID = "rebalanceId";

Review Comment:
   ```suggestion
     public static final String JOB_ID = "jobId";
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * These are rebalance stats as to how the current state is, when compared to 
the target state.
+ * Eg: If the current has 4 segments whose replicas (16) don't match the 
target state, _segmentsToRebalance
+ * is 4 and _replicasToRebalance is 16.
+ */
+public class TableRebalanceProgressStats {
+  public static class RebalanceStateStats {
+    public int _segmentsMissing;
+    public int _segmentsToRebalance;
+    public double _percentSegmentsToRebalance;
+    public int _replicasToRebalance;
+
+    RebalanceStateStats() {
+      _segmentsMissing = 0;
+      _segmentsToRebalance = 0;
+      _replicasToRebalance = 0;
+      _percentSegmentsToRebalance = 0.0;
+    }
+  }
+
+  // Done/In_progress/Failed
+  private String _status;
+  // When did Rebalance start
+  private Long _startTimeInMilliseconds;

Review Comment:
   Should we store primitive `long`? I see the caller use the value to 
calculate the time difference. If it is `null`, it will throw NPE. Also we 
usually use `Ms` as the shorter name for `InMilliseconds`
   ```suggestion
     private long _startTimeMs;
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * These are rebalance stats as to how the current state is, when compared to 
the target state.
+ * Eg: If the current has 4 segments whose replicas (16) don't match the 
target state, _segmentsToRebalance
+ * is 4 and _replicasToRebalance is 16.
+ */
+public class TableRebalanceProgressStats {
+  public static class RebalanceStateStats {
+    public int _segmentsMissing;
+    public int _segmentsToRebalance;
+    public double _percentSegmentsToRebalance;
+    public int _replicasToRebalance;
+
+    RebalanceStateStats() {
+      _segmentsMissing = 0;
+      _segmentsToRebalance = 0;
+      _replicasToRebalance = 0;
+      _percentSegmentsToRebalance = 0.0;
+    }
+  }
+
+  // Done/In_progress/Failed
+  private String _status;
+  // When did Rebalance start
+  private Long _startTimeInMilliseconds;
+  // How long did rebalance take
+  private Long _timeToFinishInSeconds;
+  // Success/failure message
+  private String _completionStatusMsg;
+  @JsonProperty("initialToTargetStateConvergence")
+  private RebalanceStateStats _initialToTargetStateConvergence;
+  @JsonProperty("currentToTargetConvergence")
+  private RebalanceStateStats _currentToTargetConvergence;
+  @JsonProperty("externalViewToIdealStateConvergence")
+  private RebalanceStateStats _externalViewToIdealStateConvergence;
+
+  public TableRebalanceProgressStats() {
+    _currentToTargetConvergence = new RebalanceStateStats();
+    _externalViewToIdealStateConvergence = new RebalanceStateStats();
+    _initialToTargetStateConvergence = new RebalanceStateStats();
+  }
+
+  public void setStatus(String status) {
+    _status = status;
+  }
+
+  public void setInitialToTargetStateConvergence(RebalanceStateStats 
initialToTargetStateConvergence) {
+    _initialToTargetStateConvergence = initialToTargetStateConvergence;
+  }
+
+  public void setStartTimeInMilliseconds(Long startTimeInMilliseconds) {
+    _startTimeInMilliseconds = startTimeInMilliseconds;
+  }
+
+  public void setTimeToFinishInSeconds(Long timeToFinishInSeconds) {
+    _timeToFinishInSeconds = timeToFinishInSeconds;
+  }

Review Comment:
   Use primitive `long`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * These are rebalance stats as to how the current state is, when compared to 
the target state.
+ * Eg: If the current has 4 segments whose replicas (16) don't match the 
target state, _segmentsToRebalance
+ * is 4 and _replicasToRebalance is 16.
+ */
+public class TableRebalanceProgressStats {
+  public static class RebalanceStateStats {
+    public int _segmentsMissing;
+    public int _segmentsToRebalance;
+    public double _percentSegmentsToRebalance;
+    public int _replicasToRebalance;
+
+    RebalanceStateStats() {
+      _segmentsMissing = 0;
+      _segmentsToRebalance = 0;
+      _replicasToRebalance = 0;
+      _percentSegmentsToRebalance = 0.0;
+    }
+  }
+
+  // Done/In_progress/Failed
+  private String _status;
+  // When did Rebalance start
+  private Long _startTimeInMilliseconds;
+  // How long did rebalance take
+  private Long _timeToFinishInSeconds;
+  // Success/failure message
+  private String _completionStatusMsg;
+  @JsonProperty("initialToTargetStateConvergence")
+  private RebalanceStateStats _initialToTargetStateConvergence;
+  @JsonProperty("currentToTargetConvergence")
+  private RebalanceStateStats _currentToTargetConvergence;
+  @JsonProperty("externalViewToIdealStateConvergence")
+  private RebalanceStateStats _externalViewToIdealStateConvergence;
+
+  public TableRebalanceProgressStats() {
+    _currentToTargetConvergence = new RebalanceStateStats();
+    _externalViewToIdealStateConvergence = new RebalanceStateStats();
+    _initialToTargetStateConvergence = new RebalanceStateStats();
+  }
+
+  public void setStatus(String status) {
+    _status = status;
+  }
+
+  public void setInitialToTargetStateConvergence(RebalanceStateStats 
initialToTargetStateConvergence) {
+    _initialToTargetStateConvergence = initialToTargetStateConvergence;
+  }
+
+  public void setStartTimeInMilliseconds(Long startTimeInMilliseconds) {
+    _startTimeInMilliseconds = startTimeInMilliseconds;
+  }
+
+  public void setTimeToFinishInSeconds(Long timeToFinishInSeconds) {
+    _timeToFinishInSeconds = timeToFinishInSeconds;
+  }
+
+  public void setExternalViewToIdealStateConvergence(RebalanceStateStats 
externalViewToIdealStateConvergence) {
+    _externalViewToIdealStateConvergence = externalViewToIdealStateConvergence;
+  }
+
+  public void setCurrentToTargetConvergence(RebalanceStateStats 
currentToTargetConvergence) {
+    _currentToTargetConvergence = currentToTargetConvergence;
+  }
+
+  public void setCompletionStatusMsg(String completionStatusMsg) {
+    _completionStatusMsg = completionStatusMsg;
+  }
+
+  public String getStatus() {
+    return _status;
+  }
+
+  public String getCompletionStatusMsg() {
+    return _completionStatusMsg;
+  }
+
+  public RebalanceStateStats getInitialToTargetStateConvergence() {
+    return _initialToTargetStateConvergence;
+  }
+
+  public double getStartTimeInMilliseconds() {

Review Comment:
   ```suggestion
     public long getStartTimeMs() {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * These are rebalance stats as to how the current state is, when compared to 
the target state.
+ * Eg: If the current has 4 segments whose replicas (16) don't match the 
target state, _segmentsToRebalance
+ * is 4 and _replicasToRebalance is 16.
+ */
+public class TableRebalanceProgressStats {
+  public static class RebalanceStateStats {
+    public int _segmentsMissing;
+    public int _segmentsToRebalance;
+    public double _percentSegmentsToRebalance;
+    public int _replicasToRebalance;
+
+    RebalanceStateStats() {
+      _segmentsMissing = 0;
+      _segmentsToRebalance = 0;
+      _replicasToRebalance = 0;
+      _percentSegmentsToRebalance = 0.0;
+    }
+  }
+
+  // Done/In_progress/Failed
+  private String _status;
+  // When did Rebalance start
+  private Long _startTimeInMilliseconds;
+  // How long did rebalance take
+  private Long _timeToFinishInSeconds;
+  // Success/failure message
+  private String _completionStatusMsg;
+  @JsonProperty("initialToTargetStateConvergence")
+  private RebalanceStateStats _initialToTargetStateConvergence;
+  @JsonProperty("currentToTargetConvergence")
+  private RebalanceStateStats _currentToTargetConvergence;
+  @JsonProperty("externalViewToIdealStateConvergence")
+  private RebalanceStateStats _externalViewToIdealStateConvergence;
+
+  public TableRebalanceProgressStats() {
+    _currentToTargetConvergence = new RebalanceStateStats();
+    _externalViewToIdealStateConvergence = new RebalanceStateStats();
+    _initialToTargetStateConvergence = new RebalanceStateStats();
+  }
+
+  public void setStatus(String status) {
+    _status = status;
+  }
+
+  public void setInitialToTargetStateConvergence(RebalanceStateStats 
initialToTargetStateConvergence) {
+    _initialToTargetStateConvergence = initialToTargetStateConvergence;
+  }
+
+  public void setStartTimeInMilliseconds(Long startTimeInMilliseconds) {
+    _startTimeInMilliseconds = startTimeInMilliseconds;
+  }
+
+  public void setTimeToFinishInSeconds(Long timeToFinishInSeconds) {
+    _timeToFinishInSeconds = timeToFinishInSeconds;
+  }
+
+  public void setExternalViewToIdealStateConvergence(RebalanceStateStats 
externalViewToIdealStateConvergence) {
+    _externalViewToIdealStateConvergence = externalViewToIdealStateConvergence;
+  }
+
+  public void setCurrentToTargetConvergence(RebalanceStateStats 
currentToTargetConvergence) {
+    _currentToTargetConvergence = currentToTargetConvergence;
+  }
+
+  public void setCompletionStatusMsg(String completionStatusMsg) {
+    _completionStatusMsg = completionStatusMsg;
+  }
+
+  public String getStatus() {
+    return _status;
+  }
+
+  public String getCompletionStatusMsg() {
+    return _completionStatusMsg;
+  }
+
+  public RebalanceStateStats getInitialToTargetStateConvergence() {
+    return _initialToTargetStateConvergence;
+  }
+
+  public double getStartTimeInMilliseconds() {
+    return _startTimeInMilliseconds;
+  }
+
+  public Long getTimeToFinishInSeconds() {

Review Comment:
   return primitive `long`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ZkBasedTableRebalanceObserver</code> observes rebalance progress and 
tracks rebalance status,
+ * stats in Zookeeper. This will be used to show the progress of rebalance to 
users via rebalanceStatus API.
+ */
+public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class);
+  private final String _tableNameWithType;
+  private final String _rebalanceJobId;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private TableRebalanceProgressStats _tableRebalanceProgressStats;
+  // Keep track of number of updates. Useful during debugging.
+  private int _numUpdatesToZk;
+
+  public ZkBasedTableRebalanceObserver(String tableNameWithType, String 
rebalanceJobId,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    Preconditions.checkState(tableNameWithType != null, "Table name cannot be 
null");
+    Preconditions.checkState(rebalanceJobId != null, "rebalanceId cannot be 
null");
+    Preconditions.checkState(pinotHelixResourceManager != null, 
"PinotHelixManager cannot be null");
+    _tableNameWithType = tableNameWithType;
+    _rebalanceJobId = rebalanceJobId;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableRebalanceProgressStats = new TableRebalanceProgressStats();
+    _numUpdatesToZk = 0;
+  }
+
+  @Override
+  public void onTrigger(Trigger trigger, Map<String, Map<String, String>> 
currentState,
+      Map<String, Map<String, String>> targetState) {
+    switch (trigger) {
+      case START_TRIGGER:
+        updateOnStart(currentState, targetState);
+        trackStatsInZk();
+        break;
+      // Write to Zk if there's change since previous stats computation
+      case IDEAL_STATE_CHANGE_TRIGGER:
+        TableRebalanceProgressStats.RebalanceStateStats latest =
+            getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        if 
(TableRebalanceProgressStats.statsDiffer(_tableRebalanceProgressStats.getCurrentToTargetConvergence(),
+            latest)) {
+          _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
+          trackStatsInZk();
+        }
+        break;
+      case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
+        latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        if (TableRebalanceProgressStats.statsDiffer(
+            
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
+          
_tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
+          trackStatsInZk();
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unimplemented trigger: " + 
trigger);
+    }
+  }
+
+  private void updateOnStart(Map<String, Map<String, String>> currentState,
+      Map<String, Map<String, String>> targetState) {
+    Preconditions.checkState(_tableRebalanceProgressStats.getStatus() != 
RebalanceResult.Status.IN_PROGRESS.toString(),
+        "Rebalance Observer onStart called multiple times");
+    
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS.toString());
+    _tableRebalanceProgressStats.setInitialToTargetStateConvergence(
+        getDifferenceBetweenTableRebalanceStates(targetState, currentState));
+    
_tableRebalanceProgressStats.setStartTimeInMilliseconds(System.currentTimeMillis());
+  }
+
+  @Override
+  public void onSuccess(String msg) {
+    Preconditions.checkState(_tableRebalanceProgressStats.getStatus() != 
RebalanceResult.Status.DONE.toString(),
+        "Table Rebalance already completed");
+    Long timeToFinishInSeconds =

Review Comment:
   Primitive `long`, same for other places



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceProgressStats.java:
##########
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+
+/**
+ * These are rebalance stats as to how the current state is, when compared to 
the target state.
+ * Eg: If the current has 4 segments whose replicas (16) don't match the 
target state, _segmentsToRebalance
+ * is 4 and _replicasToRebalance is 16.
+ */
+public class TableRebalanceProgressStats {
+  public static class RebalanceStateStats {
+    public int _segmentsMissing;
+    public int _segmentsToRebalance;
+    public double _percentSegmentsToRebalance;
+    public int _replicasToRebalance;
+
+    RebalanceStateStats() {
+      _segmentsMissing = 0;
+      _segmentsToRebalance = 0;
+      _replicasToRebalance = 0;
+      _percentSegmentsToRebalance = 0.0;
+    }
+  }
+
+  // Done/In_progress/Failed
+  private String _status;
+  // When did Rebalance start
+  private Long _startTimeInMilliseconds;
+  // How long did rebalance take
+  private Long _timeToFinishInSeconds;

Review Comment:
   Same here, should we store primitive `long`?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java:
##########
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.RebalanceConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <code>ZkBasedTableRebalanceObserver</code> observes rebalance progress and 
tracks rebalance status,
+ * stats in Zookeeper. This will be used to show the progress of rebalance to 
users via rebalanceStatus API.
+ */
+public class ZkBasedTableRebalanceObserver implements TableRebalanceObserver {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class);
+  private final String _tableNameWithType;
+  private final String _rebalanceJobId;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+  private TableRebalanceProgressStats _tableRebalanceProgressStats;
+  // Keep track of number of updates. Useful during debugging.
+  private int _numUpdatesToZk;
+
+  public ZkBasedTableRebalanceObserver(String tableNameWithType, String 
rebalanceJobId,
+      PinotHelixResourceManager pinotHelixResourceManager) {
+    Preconditions.checkState(tableNameWithType != null, "Table name cannot be 
null");
+    Preconditions.checkState(rebalanceJobId != null, "rebalanceId cannot be 
null");
+    Preconditions.checkState(pinotHelixResourceManager != null, 
"PinotHelixManager cannot be null");
+    _tableNameWithType = tableNameWithType;
+    _rebalanceJobId = rebalanceJobId;
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tableRebalanceProgressStats = new TableRebalanceProgressStats();
+    _numUpdatesToZk = 0;
+  }
+
+  @Override
+  public void onTrigger(Trigger trigger, Map<String, Map<String, String>> 
currentState,
+      Map<String, Map<String, String>> targetState) {
+    switch (trigger) {
+      case START_TRIGGER:
+        updateOnStart(currentState, targetState);
+        trackStatsInZk();
+        break;
+      // Write to Zk if there's change since previous stats computation
+      case IDEAL_STATE_CHANGE_TRIGGER:
+        TableRebalanceProgressStats.RebalanceStateStats latest =
+            getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        if 
(TableRebalanceProgressStats.statsDiffer(_tableRebalanceProgressStats.getCurrentToTargetConvergence(),
+            latest)) {
+          _tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
+          trackStatsInZk();
+        }
+        break;
+      case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER:
+        latest = getDifferenceBetweenTableRebalanceStates(targetState, 
currentState);
+        if (TableRebalanceProgressStats.statsDiffer(
+            
_tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) 
{
+          
_tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
+          trackStatsInZk();
+        }
+        break;
+      default:
+        throw new IllegalArgumentException("Unimplemented trigger: " + 
trigger);
+    }
+  }
+
+  private void updateOnStart(Map<String, Map<String, String>> currentState,
+      Map<String, Map<String, String>> targetState) {
+    Preconditions.checkState(_tableRebalanceProgressStats.getStatus() != 
RebalanceResult.Status.IN_PROGRESS.toString(),
+        "Rebalance Observer onStart called multiple times");
+    
_tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS.toString());
+    _tableRebalanceProgressStats.setInitialToTargetStateConvergence(
+        getDifferenceBetweenTableRebalanceStates(targetState, currentState));
+    
_tableRebalanceProgressStats.setStartTimeInMilliseconds(System.currentTimeMillis());
+  }
+
+  @Override
+  public void onSuccess(String msg) {
+    Preconditions.checkState(_tableRebalanceProgressStats.getStatus() != 
RebalanceResult.Status.DONE.toString(),
+        "Table Rebalance already completed");
+    Long timeToFinishInSeconds =
+        (long) (System.currentTimeMillis() - 
_tableRebalanceProgressStats.getStartTimeInMilliseconds()) / 1000;

Review Comment:
   No need to cast, same for other places



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to