Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r929429595


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -459,9 +464,25 @@ public SuccessResponse reloadSegment(
     TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? 
TableType.REALTIME : TableType.OFFLINE;
     String tableNameWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableType, LOGGER).get(0);
-    int numMessagesSent = 
_pinotHelixResourceManager.reloadSegment(tableNameWithType, segmentName, 
forceDownload);
-    if (numMessagesSent > 0) {
-      return new SuccessResponse("Sent " + numMessagesSent + " reload 
messages");
+    Pair<Integer, String> msgInfo =
+        _pinotHelixResourceManager.reloadSegment(tableNameWithType, 
segmentName, forceDownload);
+    boolean zkJobMetaWriteSuccess = false;
+    if (msgInfo.getLeft() > 0) {
+      try {
+        if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, msgInfo.getRight(),
+            msgInfo.getLeft())) {
+          zkJobMetaWriteSuccess = true;
+        } else {
+          LOGGER.error("Failed to add reload segment job meta into zookeeper 
for table {}, segment {}",

Review Comment:
   (nit) We usually put `:` for easier searching purpose, same for other places
   ```suggestion
             LOGGER.error("Failed to add reload segment job meta into zookeeper 
for table: {}, segment: {}",
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {

Review Comment:
   Annotate with `@Nullable`.
   Let's add some javadoc for the public method. Same for other public methods



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = 
"pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props

Review Comment:
   (nit)
   ```suggestion
        * Controller job ZK props
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = 
"pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = 
"controller.job.table.name";
+    public static final String CONTROLLER_JOB_ID = "controller.job.id";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = 
"controller.job.submission.time";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = 
"controller.job.messages.count";
+
+    /**
+     * Segment reload task ZK props

Review Comment:
   (nit)
   ```suggestion
        * Segment reload job ZK props
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {
+    String controllerJobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(controllerJobResourcePath, 
AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = 
_propertyStore.get(controllerJobResourcePath, null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType) {
+    String jobsResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(jobsResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableJobsRecord = _propertyStore.get(jobsResourcePath, null, 
-1);
+      Map<String, Map<String, String>> controllerJobs = 
tableJobsRecord.getMapFields();
+      return controllerJobs.entrySet().stream().filter(
+          job -> 
job.getValue().get(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE)
+              
.equals(tableNameWithType)).collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String 
segmentName, String jobId,
+      int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT.toString());
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String 
jobId, int numberOfMessagesSent) {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE,
+        ControllerJobType.RELOAD_ALL_SEGMENTS.toString());
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_SUBMISSION_TIME,
+        Long.toString(System.currentTimeMillis()));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_MESSAGES_COUNT,
+        Integer.toString(numberOfMessagesSent));
+    return addReloadJobToZK(tableNameWithType, jobId, jobMetadata);
+  }
+
+  private boolean addReloadJobToZK(String tableNameWithType, String taskId, 
Map<String, String> jobMetadata) {
+    String jobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    Stat stat = new Stat();
+    ZNRecord tableJobsZnRecord = _propertyStore.get(jobResourcePath, stat, 
AccessOption.PERSISTENT);
+    
jobMetadata.put(CommonConstants.ControllerJob.CONTROLLER_JOB_TABLE_NAME_WITH_TYPE,
 tableNameWithType);

Review Comment:
   Move this line into `addNewReloadSegmentJob()` and 
`addNewReloadAllSegmentsJob()` to be more readable. We want to keep the 
metadata creation part together



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java:
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import java.util.List;
+import javax.inject.Inject;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.server.starter.ServerInstance;
+import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class ControllerJobStatusResource {
+
+  @Inject
+  private ServerInstance _serverInstance;
+
+  @GET
+  @Path("/controllerJob/reloadStatus/{tableNameWithType}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given 
task")
+  public String taskStatus(@PathParam("tableNameWithType") String 
tableNameWithType,
+      @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
+      @QueryParam("segmentName") String segmentName)
+      throws Exception {
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
+
+    if (segmentName == null) {
+      // All segments
+      List<SegmentDataManager> allSegments = 
tableDataManager.acquireAllSegments();
+      try {
+        long successCount = 0;
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          if (segmentDataManager.getLoadTimeMs() >= 
reloadJobSubmissionTimestamp) {
+            successCount++;
+          }
+        }
+        SegmentReloadStatusValue segmentReloadStatusValue =
+            new SegmentReloadStatusValue(allSegments.size(), successCount);
+        return JsonUtils.objectToString(segmentReloadStatusValue);
+      } finally {
+        for (SegmentDataManager segmentDataManager : allSegments) {
+          tableDataManager.releaseSegment(segmentDataManager);
+        }
+      }
+    } else {
+      SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
+      if (segmentDataManager == null) {
+        throw new WebApplicationException("Segment: " + segmentName + " is not 
found", Response.Status.NOT_FOUND);

Review Comment:
   Can we handle this properly on the controller side? Should we return 0/0 
instead?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +598,98 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") 
String reloadJobId)

Review Comment:
   `jobId` itself should be enough? The `tableName` can be extracted from the 
ZK metadata



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -577,6 +598,98 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadControllerJobStatusResponse getReloadJobStatus(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "OFFLINE|REALTIME", required = true) 
@QueryParam("type") String tableTypeStr,
+      @ApiParam(value = "Reload job id", required = true) @PathParam("jobId") 
String reloadJobId)
+      throws Exception {
+    TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
+    String tableNameWithType = 
TableNameBuilder.forType(tableTypeFromRequest).tableNameWithType(tableName);
+    Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    Map<String, String> controllerJobZKMetadata = 
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+
+    if (controllerJobZKMetadata == null) {
+      throw new ControllerApplicationException(LOGGER,
+          "Failed to find controller job: " + reloadJobId + " for table: " + 
tableNameWithType, Status.NOT_FOUND);
+    }
+
+    String singleSegmentName = null;
+    if 
(controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONTROLLER_JOB_TYPE).equals(
+        ControllerJobType.RELOAD_SEGMENT.toString())) {
+      singleSegmentName = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
+    }
+
+    BiMap<String, String> serverEndPoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();

Review Comment:
   We might not want to query all the servers, especially in large shared 
cluster. We can get the ideal state of the table, then use it to find the 
servers to query. We should also use the ideal state to determine the total 
number of segments to be reloaded because when there are segments moving 
around, server might not have the correct view on the segments to be reloaded



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = 
"pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";

Review Comment:
   We usually use camel case as the key name in the ZNode. We may also simplify 
the key as it is always under the context of controller job. E.g. `jobType`, 
`jobId`, `tableName`, `submissionTimeMs`, `messagesCount`, `reloadSegmentName`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1984,6 +1987,82 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getControllerJobZKMetadata(String taskId) {
+    String controllerJobResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForControllerJob();
+    if (_propertyStore.exists(controllerJobResourcePath, 
AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = 
_propertyStore.get(controllerJobResourcePath, null, -1);

Review Comment:
   We can save one redundant ZK access and also avoid race condition, same for 
other places
   ```suggestion
       ZNRecord controllerJobRecord = 
_propertyStore.get(controllerJobResourcePath, null, -1);
       if (controllerJobRecord != null) {
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -557,6 +557,23 @@ public static class Minion {
     public static final String CONFIG_OF_MINION_QUERY_REWRITER_CLASS_NAMES = 
"pinot.minion.query.rewriter.class.names";
   }
 
+  public static class ControllerJob {
+    /**
+     * Task ZK props
+     */
+    public static final String CONTROLLER_JOB_TYPE = "controller.job.type";
+    public static final String CONTROLLER_JOB_TABLE_NAME_WITH_TYPE = 
"controller.job.table.name";
+    public static final String CONTROLLER_JOB_ID = "controller.job.id";
+    public static final String CONTROLLER_JOB_SUBMISSION_TIME = 
"controller.job.submission.time";
+    public static final String CONTROLLER_JOB_MESSAGES_COUNT = 
"controller.job.messages.count";
+
+    /**
+     * Segment reload task ZK props
+     */
+    public static final Integer MAXIMUM_RELOAD_JOBS_IN_ZK = 100;

Review Comment:
   This one applies to all jobs, instead of just the reload jobs



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to