Jackie-Jiang commented on code in PR #10088:
URL: https://github.com/apache/pinot/pull/10088#discussion_r1066430611


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -698,6 +698,8 @@ public static class ControllerJob {
      * Segment reload job ZK props
      */
     public static final String SEGMENT_RELOAD_JOB_SEGMENT_NAME = "segmentName";
+    // Force commit job ZK props
+    public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = 
"segmentForceCommitted";

Review Comment:
   (minor)
   ```suggestion
       public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = 
"segmentsForceCommitted";
   ```



##########
pinot-controller/src/main/resources/app/pages/TenantDetails.tsx:
##########
@@ -384,7 +384,7 @@ const TenantPageDetails = ({ match }: 
RouteComponentProps<Props>) => {
       setShowReloadStatusModal(true);
       const [reloadStatusData, tableJobsData] = await Promise.all([
         PinotMethodUtils.reloadStatusOp(tableName, tableType),
-        PinotMethodUtils.fetchTableJobs(tableName),
+        PinotMethodUtils.fetchTableJobs(tableName, 
"RELOAD_SEGMENT,RELOAD_ALL_SEGMENTS"),

Review Comment:
   Should we include `FORCE_COMMIT` here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -116,29 +123,67 @@ public Response resumeConsumption(
 
   @POST
   @Path("/tables/{tableName}/forceCommit")
-  @ApiOperation(value = "Force commit the current consuming segments",
-      notes = "Force commit the current segments in consuming state and 
restart consumption. "
+  @ApiOperation(value = "Force commit the current consuming segments", notes =
+      "Force commit the current segments in consuming state and restart 
consumption. "
           + "This should be used after schema/table config changes. "
           + "Please note that this is an asynchronous operation, "
           + "and 200 response does not mean it has actually been done already")
-  public Response forceCommit(
+  public SuccessResponse forceCommit(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
+    String submittedJobId = null;
     try {
-      _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
-      return Response.ok().build();
+      Set<String> consumingSegmentsForceCommitted = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+      try {
+        String jobId = UUID.randomUUID().toString();
+        _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, 
jobId, consumingSegmentsForceCommitted);
+        submittedJobId = jobId;
+      } catch (Exception e) {
+        LOGGER.error("Could not add force commit job metadata to ZK table : 
{}", tableNameWithType, e);
+      }
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
     }
+
+    return new SuccessResponse(
+        String.format("Successfully submitted force commit job id. Job meta ZK 
storage status: %s, job id : %s",
+            (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId));
   }
 
+  @GET
+  @Path("segments/forceCommitStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted force commit operation", 
notes = "Get status for a submitted "
+      + "force commit operation")
+  public Map<String, String> getForceCommitJobStatus(
+      @ApiParam(value = "Force commit job id", required = true) 
@PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = 
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    Set<String> consumingSegmentCommitted = new HashSet<>();
+    consumingSegmentCommitted = JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
+        consumingSegmentCommitted.getClass());

Review Comment:
   ```suggestion
       Set<String> consumingSegmentCommitted = JsonUtils.stringToObject(
           
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
 Set.class);
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -116,29 +123,67 @@ public Response resumeConsumption(
 
   @POST
   @Path("/tables/{tableName}/forceCommit")
-  @ApiOperation(value = "Force commit the current consuming segments",
-      notes = "Force commit the current segments in consuming state and 
restart consumption. "
+  @ApiOperation(value = "Force commit the current consuming segments", notes =
+      "Force commit the current segments in consuming state and restart 
consumption. "
           + "This should be used after schema/table config changes. "
           + "Please note that this is an asynchronous operation, "
           + "and 200 response does not mean it has actually been done already")
-  public Response forceCommit(
+  public SuccessResponse forceCommit(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
+    String submittedJobId = null;
     try {
-      _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
-      return Response.ok().build();
+      Set<String> consumingSegmentsForceCommitted = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+      try {
+        String jobId = UUID.randomUUID().toString();
+        _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, 
jobId, consumingSegmentsForceCommitted);
+        submittedJobId = jobId;
+      } catch (Exception e) {
+        LOGGER.error("Could not add force commit job metadata to ZK table : 
{}", tableNameWithType, e);
+      }
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
     }
+
+    return new SuccessResponse(
+        String.format("Successfully submitted force commit job id. Job meta ZK 
storage status: %s, job id : %s",
+            (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId));
   }
 
+  @GET
+  @Path("segments/forceCommitStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted force commit operation", 
notes = "Get status for a submitted "
+      + "force commit operation")
+  public Map<String, String> getForceCommitJobStatus(
+      @ApiParam(value = "Force commit job id", required = true) 
@PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = 
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    Set<String> consumingSegmentCommitted = new HashSet<>();
+    consumingSegmentCommitted = JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
+        consumingSegmentCommitted.getClass());
+    Set<String> onlineSegmentsForTable =
+        
_pinotHelixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType);
+    // Need to check how many are online
+    AtomicInteger onlineSegmentsCount = new AtomicInteger(0);

Review Comment:
   This is single threaded, so no need to use `AtomicInteger`



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -116,29 +123,67 @@ public Response resumeConsumption(
 
   @POST
   @Path("/tables/{tableName}/forceCommit")
-  @ApiOperation(value = "Force commit the current consuming segments",
-      notes = "Force commit the current segments in consuming state and 
restart consumption. "
+  @ApiOperation(value = "Force commit the current consuming segments", notes =
+      "Force commit the current segments in consuming state and restart 
consumption. "
           + "This should be used after schema/table config changes. "
           + "Please note that this is an asynchronous operation, "
           + "and 200 response does not mean it has actually been done already")
-  public Response forceCommit(
+  public SuccessResponse forceCommit(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
+    String submittedJobId = null;
     try {
-      _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
-      return Response.ok().build();
+      Set<String> consumingSegmentsForceCommitted = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+      try {
+        String jobId = UUID.randomUUID().toString();
+        _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, 
jobId, consumingSegmentsForceCommitted);
+        submittedJobId = jobId;
+      } catch (Exception e) {
+        LOGGER.error("Could not add force commit job metadata to ZK table : 
{}", tableNameWithType, e);
+      }
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
     }
+
+    return new SuccessResponse(
+        String.format("Successfully submitted force commit job id. Job meta ZK 
storage status: %s, job id : %s",
+            (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId));
   }
 
+  @GET
+  @Path("segments/forceCommitStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted force commit operation", 
notes = "Get status for a submitted "
+      + "force commit operation")
+  public Map<String, String> getForceCommitJobStatus(
+      @ApiParam(value = "Force commit job id", required = true) 
@PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = 
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    Set<String> consumingSegmentCommitted = new HashSet<>();
+    consumingSegmentCommitted = JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
+        consumingSegmentCommitted.getClass());
+    Set<String> onlineSegmentsForTable =
+        
_pinotHelixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType);
+    // Need to check how many are online
+    AtomicInteger onlineSegmentsCount = new AtomicInteger(0);
+    consumingSegmentCommitted.forEach(segmentName -> {
+      if (onlineSegmentsForTable.contains(segmentName)) {
+        onlineSegmentsCount.incrementAndGet();
+      }
+    });
+
+    controllerJobZKMetadata.put("onlineSegmentCount", 
Integer.toString(onlineSegmentsCount.get()));

Review Comment:
   Suggest returning the segment list not yet committed, and the count



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2062,13 +2064,15 @@ public Map<String, String> 
getControllerJobZKMetadata(String jobId) {
    * @param tableNameWithType the table for which jobs are to be fetched
    * @return A Map of jobId to job properties
    */
-  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType) {
+  public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType, Set<String> jobTypesToFilter) {

Review Comment:
   ```suggestion
     public Map<String, Map<String, String>> getAllJobsForTable(String 
tableNameWithType, @Nullable Set<String> jobTypesToFilter) {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2094,7 +2098,21 @@ public boolean addNewReloadSegmentJob(String 
tableNameWithType, String segmentNa
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numMessagesSent));
     
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
-    return addReloadJobToZK(jobId, jobMetadata);
+    return addControllerJobToZK(jobId, jobMetadata);
+  }
+
+  public boolean addNewForceCommitJob(String tableNameWithType, String jobId, 
Set<String> consumingSegmentsCommitted)
+      throws JsonProcessingException {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.FORCE_COMMIT.toString());
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(System.currentTimeMillis()));
+    jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(consumingSegmentsCommitted.size()));

Review Comment:
   Consuming segments count is not the same as messages sent. We probably don't 
need it



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java:
##########
@@ -847,16 +847,19 @@ private JsonNode getAggregateMetadataFromServer(String 
tableNameWithType, List<S
       notes = "Get list of controller jobs for this table")
   public Map<String, Map<String, String>> getControllerJobs(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
-      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr
-  ) {
+      @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String 
tableTypeStr,
+      @ApiParam(value = "Comma separated list of job types") 
@QueryParam("jobTypes") @Nullable String jobTypesString) {
     TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
     List<String> tableNamesWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableTypeFromRequest,
             LOGGER);
-
+    Set<String> jobTypesToFilter = null;
+    if (StringUtils.isNotEmpty(jobTypesString)) {
+      jobTypesToFilter = new 
HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ",")));

Review Comment:
   ```suggestion
         jobTypesToFilter = new 
HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ',')));
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -116,29 +123,67 @@ public Response resumeConsumption(
 
   @POST
   @Path("/tables/{tableName}/forceCommit")
-  @ApiOperation(value = "Force commit the current consuming segments",
-      notes = "Force commit the current segments in consuming state and 
restart consumption. "
+  @ApiOperation(value = "Force commit the current consuming segments", notes =
+      "Force commit the current segments in consuming state and restart 
consumption. "
           + "This should be used after schema/table config changes. "
           + "Please note that this is an asynchronous operation, "
           + "and 200 response does not mean it has actually been done already")
-  public Response forceCommit(
+  public SuccessResponse forceCommit(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
+    String submittedJobId = null;
     try {
-      _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
-      return Response.ok().build();
+      Set<String> consumingSegmentsForceCommitted = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+      try {
+        String jobId = UUID.randomUUID().toString();
+        _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, 
jobId, consumingSegmentsForceCommitted);
+        submittedJobId = jobId;
+      } catch (Exception e) {
+        LOGGER.error("Could not add force commit job metadata to ZK table : 
{}", tableNameWithType, e);
+      }
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
     }
+
+    return new SuccessResponse(
+        String.format("Successfully submitted force commit job id. Job meta ZK 
storage status: %s, job id : %s",
+            (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId));
   }
 
+  @GET
+  @Path("segments/forceCommitStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted force commit operation", 
notes = "Get status for a submitted "
+      + "force commit operation")
+  public Map<String, String> getForceCommitJobStatus(
+      @ApiParam(value = "Force commit job id", required = true) 
@PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = 
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    Set<String> consumingSegmentCommitted = new HashSet<>();
+    consumingSegmentCommitted = JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
+        consumingSegmentCommitted.getClass());
+    Set<String> onlineSegmentsForTable =

Review Comment:
   I think we should check IdealState instead of ExternalView. When IdealState 
is ONLINE, the commit process is done



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -116,29 +123,67 @@ public Response resumeConsumption(
 
   @POST
   @Path("/tables/{tableName}/forceCommit")
-  @ApiOperation(value = "Force commit the current consuming segments",
-      notes = "Force commit the current segments in consuming state and 
restart consumption. "
+  @ApiOperation(value = "Force commit the current consuming segments", notes =
+      "Force commit the current segments in consuming state and restart 
consumption. "
           + "This should be used after schema/table config changes. "
           + "Please note that this is an asynchronous operation, "
           + "and 200 response does not mean it has actually been done already")
-  public Response forceCommit(
+  public SuccessResponse forceCommit(
       @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
     validate(tableNameWithType);
+    String submittedJobId = null;
     try {
-      _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
-      return Response.ok().build();
+      Set<String> consumingSegmentsForceCommitted = 
_pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType);
+      try {
+        String jobId = UUID.randomUUID().toString();
+        _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, 
jobId, consumingSegmentsForceCommitted);
+        submittedJobId = jobId;
+      } catch (Exception e) {
+        LOGGER.error("Could not add force commit job metadata to ZK table : 
{}", tableNameWithType, e);
+      }
     } catch (Exception e) {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
     }
+
+    return new SuccessResponse(
+        String.format("Successfully submitted force commit job id. Job meta ZK 
storage status: %s, job id : %s",
+            (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId));
   }
 
+  @GET
+  @Path("segments/forceCommitStatus/{jobId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted force commit operation", 
notes = "Get status for a submitted "
+      + "force commit operation")
+  public Map<String, String> getForceCommitJobStatus(
+      @ApiParam(value = "Force commit job id", required = true) 
@PathParam("jobId") String reloadJobId)
+      throws Exception {
+    Map<String, String> controllerJobZKMetadata = 
_pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId);
+    String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
+    Set<String> consumingSegmentCommitted = new HashSet<>();
+    consumingSegmentCommitted = JsonUtils.stringToObject(
+        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST),
+        consumingSegmentCommitted.getClass());
+    Set<String> onlineSegmentsForTable =

Review Comment:
   Actually we cannot use this method because it will also track the CONSUMING 
segments



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to