Jackie-Jiang commented on code in PR #10359:
URL: https://github.com/apache/pinot/pull/10359#discussion_r1139302569


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -212,13 +214,13 @@ public Response getPauseStatus(
       + "Note that the partitionToOffsetMap has been deprecated and will be 
removed in the next release. The info is "
       + "now embedded within each partition's state as currentOffsetsMap.")
   @ApiResponses(value = {
-      @ApiResponse(code = 200, message = "Success"),
-      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, 
message = "Table not found"),

Review Comment:
   (nit) Revert



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -636,7 +636,7 @@ public RebalanceResult rebalance(
           + "segments in a round-robin fashion as if adding new segments to an 
empty table)") @DefaultValue("false")
   @QueryParam("bootstrap") boolean bootstrap,
       @ApiParam(value = "Whether to allow downtime for the rebalance") 
@DefaultValue("false") @QueryParam("downtime")
-          boolean downtime, @ApiParam(
+      boolean downtime, @ApiParam(

Review Comment:
   (nit) The format doesn't seem correct for this method



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -712,6 +715,37 @@ public String getTableState(
     }
   }
 
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.UPDATE)
+  @Path("/rebalanceStatus/{jobId}")
+  @ApiOperation(value = "Gets detailed stats of a rebalance operation",
+      notes = "Gets detailed stats of a rebalance operation")
+  public ServerRebalanceJobStatusResponse rebalanceStatus(
+      @ApiParam(value = "Rebalance Job Id", required = true) 
@PathParam("jobId") String jobId)
+      throws JsonProcessingException {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadataForJobType(jobId, 
ControllerJobType.TABLE_REBALANCE);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
+          Response.Status.NOT_FOUND);
+    }
+    TableRebalanceProgressStats tableRebalanceProgressStats =
+        
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+            TableRebalanceProgressStats.class);
+    Long timeSinceStartInSecs = 0L;
+    if 
(!tableRebalanceProgressStats.getStatus().toString().equals(RebalanceResult.Status.DONE.toString()))
 {
+      timeSinceStartInSecs =
+          (long) (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeInMilliseconds()) / 1000;

Review Comment:
   (minor) The case is redundant
   ```suggestion
             (System.currentTimeMillis() - 
tableRebalanceProgressStats.getStartTimeInMilliseconds()) / 1000;
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/ServerRebalanceJobStatusResponse.java:
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+
+
+public class ServerRebalanceJobStatusResponse {
+  private long _timeElapsedSinceStartInSeconds;
+
+  @JsonProperty("tableRebalanceProgressStats")

Review Comment:
   (minor) The annotation is redundant since the name is the same



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2090,21 +2093,27 @@ public Map<String, String> 
getControllerJobZKMetadata(String jobId) {
    * @return A Map of jobId to job properties
    */
   public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType,
-      @Nullable Set<String> jobTypesToFilter) {
-    String jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
-    try {
-      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, 
-1);
-      Map<String, Map<String, String>> controllerJobs = 
tableJobsRecord.getMapFields();
-      return controllerJobs.entrySet().stream().filter(
-              job -> 
job.getValue().get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE).equals(tableNameWithType)
-                  && (jobTypesToFilter == null || jobTypesToFilter.contains(
-                      
job.getValue().get(CommonConstants.ControllerJob.JOB_TYPE))))
-          .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-    } catch (ZkNoNodeException e) {
-      LOGGER.warn("Could not find controller job node for table : {}", 
tableNameWithType, e);
+      Set<ControllerJobType> jobTypesToIterate) {

Review Comment:
   (nit)
   ```suggestion
         Set<ControllerJobType> jobTypes) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java:
##########
@@ -29,23 +29,31 @@
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class RebalanceResult {
+  private final String _rebalanceId;
   private final Status _status;
   private final Map<InstancePartitionsType, InstancePartitions> 
_instanceAssignment;
   private final Map<String, InstancePartitions> _tierInstanceAssignment;
-  private final Map<String, Map<String, String>> _segmentAssignment;
+  private final Map<String, Map<String, String>> _targetSegmentAssignment;
   private final String _description;
 
   @JsonCreator
-  public RebalanceResult(@JsonProperty(value = "status", required = true) 
Status status,
+  public RebalanceResult(@JsonProperty(value = "rebalanceId", required = true) 
String rebalanceId,
+      @JsonProperty(value = "status", required = true) Status status,
       @JsonProperty(value = "description", required = true) String description,
       @JsonProperty("instanceAssignment") @Nullable 
Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
       @JsonProperty("tierInstanceAssignment") @Nullable Map<String, 
InstancePartitions> tierInstanceAssignment,
-      @JsonProperty("segmentAssignment") @Nullable Map<String, Map<String, 
String>> segmentAssignment) {
+      @JsonProperty("targetSegmentAssignment") @Nullable Map<String, 
Map<String, String>> targetSegmentAssignment) {

Review Comment:
   Let's not change this name because it is incompatible with the current 
response format (imagine client code look for this field)



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/RebalanceConfigConstants.java:
##########
@@ -25,6 +25,12 @@ public class RebalanceConfigConstants {
   private RebalanceConfigConstants() {
   }
 
+  // Unique Id for rebalance
+  public static final String REBALANCE_ID = "rebalanceId";

Review Comment:
   (minor) Since it is already under the scope of rebalance
   ```suggestion
     public static final String JOB_ID = "jobId";
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -712,6 +715,37 @@ public String getTableState(
     }
   }
 
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.UPDATE)
+  @Path("/rebalanceStatus/{jobId}")
+  @ApiOperation(value = "Gets detailed stats of a rebalance operation",
+      notes = "Gets detailed stats of a rebalance operation")
+  public ServerRebalanceJobStatusResponse rebalanceStatus(
+      @ApiParam(value = "Rebalance Job Id", required = true) 
@PathParam("jobId") String jobId)
+      throws JsonProcessingException {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadataForJobType(jobId, 
ControllerJobType.TABLE_REBALANCE);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
+          Response.Status.NOT_FOUND);
+    }
+    TableRebalanceProgressStats tableRebalanceProgressStats =
+        
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+            TableRebalanceProgressStats.class);
+    Long timeSinceStartInSecs = 0L;
+    if 
(!tableRebalanceProgressStats.getStatus().toString().equals(RebalanceResult.Status.DONE.toString()))
 {

Review Comment:
   (minor)
   ```suggestion
       if (tableRebalanceProgressStats.getStatus() != 
RebalanceResult.Status.DONE) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -712,6 +715,37 @@ public String getTableState(
     }
   }
 
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authenticate(AccessType.UPDATE)
+  @Path("/rebalanceStatus/{jobId}")
+  @ApiOperation(value = "Gets detailed stats of a rebalance operation",
+      notes = "Gets detailed stats of a rebalance operation")
+  public ServerRebalanceJobStatusResponse rebalanceStatus(
+      @ApiParam(value = "Rebalance Job Id", required = true) 
@PathParam("jobId") String jobId)
+      throws JsonProcessingException {
+    Map<String, String> controllerJobZKMetadata =
+        _pinotHelixResourceManager.getControllerJobZKMetadataForJobType(jobId, 
ControllerJobType.TABLE_REBALANCE);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER, "Failed to find 
controller job id: " + jobId,
+          Response.Status.NOT_FOUND);
+    }
+    TableRebalanceProgressStats tableRebalanceProgressStats =
+        
JsonUtils.stringToObject(controllerJobZKMetadata.get(RebalanceConfigConstants.REBALANCE_PROGRESS_STATS),
+            TableRebalanceProgressStats.class);
+    Long timeSinceStartInSecs = 0L;

Review Comment:
   (minor) use primitive `long`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2070,13 +2072,14 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
   }
 
   /**
-   * Returns the ZK metdata for the given jobId
+   * Returns the ZK metdata for the given jobId and jobType
    * @param jobId the id of the job
+   * @param jobType Job Path
    * @return Map representing the job's ZK properties
    */
   @Nullable
-  public Map<String, String> getControllerJobZKMetadata(String jobId) {
-    String controllerJobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+  public Map<String, String> getControllerJobZKMetadataForJobType(String 
jobId, ControllerJobType jobType) {

Review Comment:
   (minor) Suggest keeping the existing method name
   ```suggestion
     public Map<String, String> getControllerJobZKMetadata(String jobId, 
ControllerJobType jobType) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -212,13 +214,13 @@ public Response getPauseStatus(
       + "Note that the partitionToOffsetMap has been deprecated and will be 
removed in the next release. The info is "
       + "now embedded within each partition's state as currentOffsetsMap.")
   @ApiResponses(value = {
-      @ApiResponse(code = 200, message = "Success"),
-      @ApiResponse(code = 404, message = "Table not found"),
+      @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 404, 
message = "Table not found"),
       @ApiResponse(code = 500, message = "Internal server error")
   })
   public ConsumingSegmentInfoReader.ConsumingSegmentsInfoMap 
getConsumingSegmentsInfo(
-      @ApiParam(value = "Realtime table name with or without type", required = 
true,
-          example = "myTable | myTable_REALTIME") @PathParam("tableName") 
String realtimeTableName) {
+      @ApiParam(value = "Realtime table name with or without type", required = 
true, example = "myTable | "
+          + "myTable_REALTIME")
+      @PathParam("tableName") String realtimeTableName) {

Review Comment:
   (nit) Revert



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultTableRebalanceObserver.java:
##########
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import java.util.Map;
+
+
+/**
+ * Default No-op TableRebalanceObserver.
+ */
+public class DefaultTableRebalanceObserver implements TableRebalanceObserver {

Review Comment:
   (minor) Suggest renaming it to `NoOpTableRebalanceObserver` for readability



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java:
##########
@@ -29,23 +29,31 @@
 
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class RebalanceResult {
+  private final String _rebalanceId;
   private final Status _status;
   private final Map<InstancePartitionsType, InstancePartitions> 
_instanceAssignment;
   private final Map<String, InstancePartitions> _tierInstanceAssignment;
-  private final Map<String, Map<String, String>> _segmentAssignment;
+  private final Map<String, Map<String, String>> _targetSegmentAssignment;
   private final String _description;
 
   @JsonCreator
-  public RebalanceResult(@JsonProperty(value = "status", required = true) 
Status status,
+  public RebalanceResult(@JsonProperty(value = "rebalanceId", required = true) 
String rebalanceId,

Review Comment:
   (minor)
   ```suggestion
     public RebalanceResult(@JsonProperty(value = "jobId", required = true) 
String jobId,
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java:
##########
@@ -115,15 +116,30 @@ public class TableRebalancer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalancer.class);
   private final HelixManager _helixManager;
   private final HelixDataAccessor _helixDataAccessor;
+  private final TableRebalanceObserver _tableRebalanceObserver;
 
-  public TableRebalancer(HelixManager helixManager) {
+  public TableRebalancer(HelixManager helixManager, @Nullable 
TableRebalanceObserver tableRebalanceObserver) {
     _helixManager = helixManager;
+    if (tableRebalanceObserver != null) {
+      _tableRebalanceObserver = tableRebalanceObserver;
+    } else {
+      _tableRebalanceObserver = new DefaultTableRebalanceObserver();
+    }
     _helixDataAccessor = helixManager.getHelixDataAccessor();
   }
 
+  public TableRebalancer(HelixManager helixManager) {
+    this(helixManager, null);
+  }
+
+  public static String createUniqueRebalanceJobIdentifier() {
+    return UUID.randomUUID().toString();
+  }
+
   public RebalanceResult rebalance(TableConfig tableConfig, Configuration 
rebalanceConfig) {
     long startTimeMs = System.currentTimeMillis();
     String tableNameWithType = tableConfig.getTableName();
+    String rebalanceJobId = 
rebalanceConfig.getString(RebalanceConfigConstants.REBALANCE_ID);

Review Comment:
   To be more robust, we can create a unique job id when it is not passed from 
the rebalance config. Also, with this change, we don't need to modify the code 
when rebalance is not triggered by the controller (client code doesn't need to 
be changed as well)
   
   We should also log the rebalance job id in the logger messages for easier 
tracking



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to