Jackie-Jiang commented on code in PR #10359:
URL: https://github.com/apache/pinot/pull/10359#discussion_r1137793075


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java:
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+
+public class ServerRebalanceJobStatusResponse {
+    private Long _timeElapsedSinceStartInSeconds;
+    @JsonProperty("tableRebalanceProgressStats")
+    private TableRebalanceProgressStats _tableRebalanceProgressStats;
+    public void setTimeElapsedSinceStartInSeconds(Long timeElapsedSinceStart) {

Review Comment:
   (code style) Add some empty lines between methods for readability



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -115,15 +116,21 @@ public class TableRebalancer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalancer.class);
   private final HelixManager _helixManager;
   private final HelixDataAccessor _helixDataAccessor;
+  private final TableRebalanceObserver _tableRebalanceObserver;
 
-  public TableRebalancer(HelixManager helixManager) {
+  public TableRebalancer(HelixManager helixManager, TableRebalanceObserver 
tableRebalanceObserver) {

Review Comment:
   Make the `tableRebalanceObserver` nullable, and create a no-op observer when 
it is null. Keep the current constructor and add a new constructor for backward 
compatibility
   ```suggestion
     public TableRebalancer(HelixManager helixManager, @Nullable 
TableRebalanceObserver tableRebalanceObserver) {
       ...
     }
     
     public TableRebalancer(HelixManager helixManager) {
       this(helixManager, null);
     }
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java:
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+
+public class ServerRebalanceJobStatusResponse {
+    private Long _timeElapsedSinceStartInSeconds;

Review Comment:
   Should this be primitive long?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2116,28 +2140,30 @@ public Map<String, Map<String, String>> 
getAllJobsForTable(String tableNameWithT
    * @return boolean representing success / failure of the ZK write step
    */
   public boolean addNewReloadSegmentJob(String tableNameWithType, String 
segmentName, String jobId,
-      int numMessagesSent) {
+                                        int numMessagesSent) {

Review Comment:
   (code format) Reformat the changes, same for other places



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2089,22 +2100,35 @@ public Map<String, String> 
getControllerJobZKMetadata(String jobId) {
    * @param tableNameWithType the table for which jobs are to be fetched
    * @return A Map of jobId to job properties
    */
-  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType,
-      @Nullable Set<String> jobTypesToFilter) {
-    String jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
-    try {
-      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, 
-1);
-      Map<String, Map<String, String>> controllerJobs = 
tableJobsRecord.getMapFields();
-      return controllerJobs.entrySet().stream().filter(
-              job -> 
job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType)
-                  && (jobTypesToFilter == null || jobTypesToFilter.contains(
-                      
job.getValue().get(CommonConstants.ControllerJob.JOB_TYPE))))
-          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    } catch (ZkNoNodeException e) {
-      LOGGER.warn("Could not find controller job node for table : {}", 
tableNameWithType, e);
+  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType, Set<String> jobTypesToIterate) {

Review Comment:
   (minor) Same here, we may pass `Set<ControllerJobType>`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java:
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+
+public class ServerRebalanceJobStatusResponse {
+    private Long _timeElapsedSinceStartInSeconds;
+    @JsonProperty("tableRebalanceProgressStats")
+    private TableRebalanceProgressStats _tableRebalanceProgressStats;
+    public void setTimeElapsedSinceStartInSeconds(Long timeElapsedSinceStart) {
+        _timeElapsedSinceStartInSeconds = timeElapsedSinceStart;
+    }
+    public void setTableRebalanceProgressStats(TableRebalanceProgressStats 
tableRebalanceProgressStats) {
+        _tableRebalanceProgressStats = tableRebalanceProgressStats;
+    }
+    public TableRebalanceProgressStats getTableRebalanceProgressStats() {
+        return _tableRebalanceProgressStats;
+    }
+    public double getTimeElapsedSinceStartInSeconds() {

Review Comment:
   This should return `long`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2089,22 +2100,35 @@ public Map<String, String> 
getControllerJobZKMetadata(String jobId) {
    * @param tableNameWithType the table for which jobs are to be fetched
    * @return A Map of jobId to job properties
    */
-  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType,
-      @Nullable Set<String> jobTypesToFilter) {
-    String jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
-    try {
-      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, 
-1);
-      Map<String, Map<String, String>> controllerJobs = 
tableJobsRecord.getMapFields();
-      return controllerJobs.entrySet().stream().filter(
-              job -> 
job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType)
-                  && (jobTypesToFilter == null || jobTypesToFilter.contains(
-                      
job.getValue().get(CommonConstants.ControllerJob.JOB_TYPE))))
-          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    } catch (ZkNoNodeException e) {
-      LOGGER.warn("Could not find controller job node for table : {}", 
tableNameWithType, e);
+  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType, Set<String> jobTypesToIterate) {
+    Map<String, Map<String, String>> controllerJobs = new HashMap<>();
+    for (String jobType : jobTypesToIterate) {
+      String jobsResourcePath;
+      if (jobType.equals(ControllerJobType.TABLE_REBALANCE.name())) {
+        jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJobType(jobType);
+      } else {
+        // For other types, we will continue to use the root path until we 
migrate
+        // other types to use separate nodes based on jobType like 
TABLE_REBALANCE
+        jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+      }
+      try {
+        ZNRecord znRecord = _propertyStore.get(jobsResourcePath, null, -1);
+        if (znRecord != null) {
+          Map<String, Map<String, String>> tableJobsRecord = 
znRecord.getMapFields();
+          for (Map.Entry<String, Map<String, String>> tableEntry : 
tableJobsRecord.entrySet()) {
+            if 
(tableEntry.getValue().get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType)
+                && 
tableEntry.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)
+                .equals(tableNameWithType)) {
+              controllerJobs.put(tableEntry.getKey(), tableEntry.getValue());
+            }
+          }
+        }
+      } catch (ZkNoNodeException e) {
+        LOGGER.warn("Could not find controller job node for table : {} 
resource: {}", tableNameWithType, jobType,

Review Comment:
   ```suggestion
           LOGGER.warn("Could not find controller job node for table: {}, type: 
{}", tableNameWithType, jobType,
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -3124,13 +3176,30 @@ private PinotResourceManagerResponse 
enableInstance(String instanceName, boolean
         "Instance: " + instanceName + (enableInstance ? " enable" : " 
disable") + " failed, timeout");
   }
 
-  public RebalanceResult rebalanceTable(String tableNameWithType, 
Configuration rebalanceConfig)
+  /**
+   * Entry point for table Rebalacing.
+   * @param tableNameWithType
+   * @param rebalanceConfig
+   * @param helixZkManager
+   * @return RebalanceResult
+   * @throws TableNotFoundException
+   */
+  public static RebalanceResult rebalanceTable(String tableNameWithType, 
Configuration rebalanceConfig,

Review Comment:
   Suggest not making it a static method. We should only use 
`ZkBasedTableRebalanceObserver` when it is triggered by the controller, and 
this method shouldn't be used by the tooling (`PinotTableRebalancer`)



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2154,13 +2180,39 @@ public boolean addNewReloadAllSegmentsJob(String 
tableNameWithType, String jobId
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
-    return addControllerJobToZK(jobId, jobMetadata);
+    return addControllerJobToZK(jobId, jobMetadata, 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(),
+            _propertyStore);
   }
 
-  private boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata) {
-    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+  public static boolean addRebalanceResultToZk(String tableNameWithType,
+                                               String jobId,
+                                               TableRebalanceProgressStats 
tableRebalanceProgressStats,
+                                               ZkHelixPropertyStore<ZNRecord> 
propertyStore) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.TABLE_REBALANCE.name());
+    try {
+      jobMetadata.put(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS,
+              JsonUtils.objectToString(tableRebalanceProgressStats));
+    } catch (JsonProcessingException e) {
+      LOGGER.error("Error serialising rebalance stats to JSON for persisting 
to ZK {}",
+              jobId, e);
+    }
+    return addControllerJobToZK(jobId, jobMetadata,
+                                
ZKMetadataProvider.constructPropertyStorePathForControllerJobType(
+                                ControllerJobType.TABLE_REBALANCE.name()),
+                                propertyStore);
+  }
+
+  private static boolean addControllerJobToZK(String jobId, Map<String, 
String> jobMetadata,

Review Comment:
   Suggest not making it a static method. We may pass the 
`PinotHelixResourceManager` when constructing the 
`ZkBasedTableRebalanceObserver`. We should only construct 
`ZkBasedTableRebalanceObserver` from the `PinotHelixResourceManager` to ensure 
it is triggered by the controller.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2070,13 +2073,21 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
   }
 
   /**
-   * Returns the ZK metdata for the given jobId
+   * Returns the ZK metdata for the given jobId and jobType
    * @param jobId the id of the job
+   * @param jobType Job Path
    * @return Map representing the job's ZK properties
    */
   @Nullable
-  public Map<String, String> getControllerJobZKMetadata(String jobId) {
-    String controllerJobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+  public Map<String, String> getControllerJobZKMetadataForJobType(String 
jobId, String jobType) {

Review Comment:
   (minor) We may pass the `ControllerJobType` and use `if (jobType == 
ControllerJobType.TABLE_REBALANCE)`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2154,13 +2180,39 @@ public boolean addNewReloadAllSegmentsJob(String 
tableNameWithType, String jobId
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
-    return addControllerJobToZK(jobId, jobMetadata);
+    return addControllerJobToZK(jobId, jobMetadata, 
ZKMetadataProvider.constructPropertyStorePathForControllerJob(),
+            _propertyStore);
   }
 
-  private boolean addControllerJobToZK(String jobId, Map<String, String> 
jobMetadata) {
-    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+  public static boolean addRebalanceResultToZk(String tableNameWithType,

Review Comment:
   I don't think this method belongs to this class. Suggest moving it to 
`ZkBasedTableRebalanceObserver` and change it to non-static method. All the 
arguments should already be member variable



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java:
##########
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+
+public class ServerRebalanceJobStatusResponse {

Review Comment:
   (code style) The indentation is still not correct. Please reformat the 
changes. Same for other files



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -112,12 +112,16 @@ public static String 
constructPropertyStorePathForInstancePartitions(String inst
     return StringUtil.join("/", PROPERTYSTORE_INSTANCE_PARTITIONS_PREFIX, 
instancePartitionsName);
   }
 
+  public static String constructPropertyStorePathForResource(String 
resourceName) {
+    return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName);
+  }
+
   public static String constructPropertyStorePathForControllerJob() {
     return StringUtil.join("/", PROPERTYSTORE_CONTROLLER_JOBS_PREFIX);
   }
 
-  public static String constructPropertyStorePathForResource(String 
resourceName) {
-    return StringUtil.join("/", PROPERTYSTORE_SEGMENTS_PREFIX, resourceName);
+  public static String constructPropertyStorePathForControllerJobType(String 
jobType) {

Review Comment:
   (minor)
   ```suggestion
     public static String constructPropertyStorePathForControllerJob(String 
jobType) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to