Jackie-Jiang commented on code in PR #10088: URL: https://github.com/apache/pinot/pull/10088#discussion_r1066430611
########## pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java: ########## @@ -698,6 +698,8 @@ public static class ControllerJob { * Segment reload job ZK props */ public static final String SEGMENT_RELOAD_JOB_SEGMENT_NAME = "segmentName"; + // Force commit job ZK props + public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentForceCommitted"; Review Comment: (minor) ```suggestion public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted"; ``` ########## pinot-controller/src/main/resources/app/pages/TenantDetails.tsx: ########## @@ -384,7 +384,7 @@ const TenantPageDetails = ({ match }: RouteComponentProps<Props>) => { setShowReloadStatusModal(true); const [reloadStatusData, tableJobsData] = await Promise.all([ PinotMethodUtils.reloadStatusOp(tableName, tableType), - PinotMethodUtils.fetchTableJobs(tableName), + PinotMethodUtils.fetchTableJobs(tableName, "RELOAD_SEGMENT,RELOAD_ALL_SEGMENTS"), Review Comment: Should we include `FORCE_COMMIT` here? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -116,29 +123,67 @@ public Response resumeConsumption( @POST @Path("/tables/{tableName}/forceCommit") - @ApiOperation(value = "Force commit the current consuming segments", - notes = "Force commit the current segments in consuming state and restart consumption. " + @ApiOperation(value = "Force commit the current consuming segments", notes = + "Force commit the current segments in consuming state and restart consumption. " + "This should be used after schema/table config changes. " + "Please note that this is an asynchronous operation, " + "and 200 response does not mean it has actually been done already") - public Response forceCommit( + public SuccessResponse forceCommit( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validate(tableNameWithType); + String submittedJobId = null; try { - _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); - return Response.ok().build(); + Set<String> consumingSegmentsForceCommitted = _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); + try { + String jobId = UUID.randomUUID().toString(); + _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, consumingSegmentsForceCommitted); + submittedJobId = jobId; + } catch (Exception e) { + LOGGER.error("Could not add force commit job metadata to ZK table : {}", tableNameWithType, e); + } } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } + + return new SuccessResponse( + String.format("Successfully submitted force commit job id. Job meta ZK storage status: %s, job id : %s", + (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId)); } + @GET + @Path("segments/forceCommitStatus/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted force commit operation", notes = "Get status for a submitted " + + "force commit operation") + public Map<String, String> getForceCommitJobStatus( + @ApiParam(value = "Force commit job id", required = true) @PathParam("jobId") String reloadJobId) + throws Exception { + Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); + String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); + Set<String> consumingSegmentCommitted = new HashSet<>(); + consumingSegmentCommitted = JsonUtils.stringToObject( + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), + consumingSegmentCommitted.getClass()); Review Comment: ```suggestion Set<String> consumingSegmentCommitted = JsonUtils.stringToObject( controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), Set.class); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -116,29 +123,67 @@ public Response resumeConsumption( @POST @Path("/tables/{tableName}/forceCommit") - @ApiOperation(value = "Force commit the current consuming segments", - notes = "Force commit the current segments in consuming state and restart consumption. " + @ApiOperation(value = "Force commit the current consuming segments", notes = + "Force commit the current segments in consuming state and restart consumption. " + "This should be used after schema/table config changes. " + "Please note that this is an asynchronous operation, " + "and 200 response does not mean it has actually been done already") - public Response forceCommit( + public SuccessResponse forceCommit( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validate(tableNameWithType); + String submittedJobId = null; try { - _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); - return Response.ok().build(); + Set<String> consumingSegmentsForceCommitted = _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); + try { + String jobId = UUID.randomUUID().toString(); + _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, consumingSegmentsForceCommitted); + submittedJobId = jobId; + } catch (Exception e) { + LOGGER.error("Could not add force commit job metadata to ZK table : {}", tableNameWithType, e); + } } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } + + return new SuccessResponse( + String.format("Successfully submitted force commit job id. Job meta ZK storage status: %s, job id : %s", + (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId)); } + @GET + @Path("segments/forceCommitStatus/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted force commit operation", notes = "Get status for a submitted " + + "force commit operation") + public Map<String, String> getForceCommitJobStatus( + @ApiParam(value = "Force commit job id", required = true) @PathParam("jobId") String reloadJobId) + throws Exception { + Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); + String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); + Set<String> consumingSegmentCommitted = new HashSet<>(); + consumingSegmentCommitted = JsonUtils.stringToObject( + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), + consumingSegmentCommitted.getClass()); + Set<String> onlineSegmentsForTable = + _pinotHelixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType); + // Need to check how many are online + AtomicInteger onlineSegmentsCount = new AtomicInteger(0); Review Comment: This is single threaded, so no need to use `AtomicInteger` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -116,29 +123,67 @@ public Response resumeConsumption( @POST @Path("/tables/{tableName}/forceCommit") - @ApiOperation(value = "Force commit the current consuming segments", - notes = "Force commit the current segments in consuming state and restart consumption. " + @ApiOperation(value = "Force commit the current consuming segments", notes = + "Force commit the current segments in consuming state and restart consumption. " + "This should be used after schema/table config changes. " + "Please note that this is an asynchronous operation, " + "and 200 response does not mean it has actually been done already") - public Response forceCommit( + public SuccessResponse forceCommit( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validate(tableNameWithType); + String submittedJobId = null; try { - _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); - return Response.ok().build(); + Set<String> consumingSegmentsForceCommitted = _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); + try { + String jobId = UUID.randomUUID().toString(); + _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, consumingSegmentsForceCommitted); + submittedJobId = jobId; + } catch (Exception e) { + LOGGER.error("Could not add force commit job metadata to ZK table : {}", tableNameWithType, e); + } } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } + + return new SuccessResponse( + String.format("Successfully submitted force commit job id. Job meta ZK storage status: %s, job id : %s", + (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId)); } + @GET + @Path("segments/forceCommitStatus/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted force commit operation", notes = "Get status for a submitted " + + "force commit operation") + public Map<String, String> getForceCommitJobStatus( + @ApiParam(value = "Force commit job id", required = true) @PathParam("jobId") String reloadJobId) + throws Exception { + Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); + String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); + Set<String> consumingSegmentCommitted = new HashSet<>(); + consumingSegmentCommitted = JsonUtils.stringToObject( + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), + consumingSegmentCommitted.getClass()); + Set<String> onlineSegmentsForTable = + _pinotHelixResourceManager.getOnlineSegmentsFromExternalView(tableNameWithType); + // Need to check how many are online + AtomicInteger onlineSegmentsCount = new AtomicInteger(0); + consumingSegmentCommitted.forEach(segmentName -> { + if (onlineSegmentsForTable.contains(segmentName)) { + onlineSegmentsCount.incrementAndGet(); + } + }); + + controllerJobZKMetadata.put("onlineSegmentCount", Integer.toString(onlineSegmentsCount.get())); Review Comment: Suggest returning the segment list not yet committed, and the count ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2062,13 +2064,15 @@ public Map<String, String> getControllerJobZKMetadata(String jobId) { * @param tableNameWithType the table for which jobs are to be fetched * @return A Map of jobId to job properties */ - public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType) { + public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType, Set<String> jobTypesToFilter) { Review Comment: ```suggestion public Map<String, Map<String, String>> getAllJobsForTable(String tableNameWithType, @Nullable Set<String> jobTypesToFilter) { ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -2094,7 +2098,21 @@ public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentNa jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent)); jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName); - return addReloadJobToZK(jobId, jobMetadata); + return addControllerJobToZK(jobId, jobMetadata); + } + + public boolean addNewForceCommitJob(String tableNameWithType, String jobId, Set<String> consumingSegmentsCommitted) + throws JsonProcessingException { + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT.toString()); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(System.currentTimeMillis())); + jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(consumingSegmentsCommitted.size())); Review Comment: Consuming segments count is not the same as messages sent. We probably don't need it ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java: ########## @@ -847,16 +847,19 @@ private JsonNode getAggregateMetadataFromServer(String tableNameWithType, List<S notes = "Get list of controller jobs for this table") public Map<String, Map<String, String>> getControllerJobs( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, - @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr - ) { + @ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String tableTypeStr, + @ApiParam(value = "Comma separated list of job types") @QueryParam("jobTypes") @Nullable String jobTypesString) { TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr); List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER); - + Set<String> jobTypesToFilter = null; + if (StringUtils.isNotEmpty(jobTypesString)) { + jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ","))); Review Comment: ```suggestion jobTypesToFilter = new HashSet<>(java.util.Arrays.asList(StringUtils.split(jobTypesString, ','))); ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -116,29 +123,67 @@ public Response resumeConsumption( @POST @Path("/tables/{tableName}/forceCommit") - @ApiOperation(value = "Force commit the current consuming segments", - notes = "Force commit the current segments in consuming state and restart consumption. " + @ApiOperation(value = "Force commit the current consuming segments", notes = + "Force commit the current segments in consuming state and restart consumption. " + "This should be used after schema/table config changes. " + "Please note that this is an asynchronous operation, " + "and 200 response does not mean it has actually been done already") - public Response forceCommit( + public SuccessResponse forceCommit( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validate(tableNameWithType); + String submittedJobId = null; try { - _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); - return Response.ok().build(); + Set<String> consumingSegmentsForceCommitted = _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); + try { + String jobId = UUID.randomUUID().toString(); + _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, consumingSegmentsForceCommitted); + submittedJobId = jobId; + } catch (Exception e) { + LOGGER.error("Could not add force commit job metadata to ZK table : {}", tableNameWithType, e); + } } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } + + return new SuccessResponse( + String.format("Successfully submitted force commit job id. Job meta ZK storage status: %s, job id : %s", + (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId)); } + @GET + @Path("segments/forceCommitStatus/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted force commit operation", notes = "Get status for a submitted " + + "force commit operation") + public Map<String, String> getForceCommitJobStatus( + @ApiParam(value = "Force commit job id", required = true) @PathParam("jobId") String reloadJobId) + throws Exception { + Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); + String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); + Set<String> consumingSegmentCommitted = new HashSet<>(); + consumingSegmentCommitted = JsonUtils.stringToObject( + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), + consumingSegmentCommitted.getClass()); + Set<String> onlineSegmentsForTable = Review Comment: I think we should check IdealState instead of ExternalView. When IdealState is ONLINE, the commit process is done ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -116,29 +123,67 @@ public Response resumeConsumption( @POST @Path("/tables/{tableName}/forceCommit") - @ApiOperation(value = "Force commit the current consuming segments", - notes = "Force commit the current segments in consuming state and restart consumption. " + @ApiOperation(value = "Force commit the current consuming segments", notes = + "Force commit the current segments in consuming state and restart consumption. " + "This should be used after schema/table config changes. " + "Please note that this is an asynchronous operation, " + "and 200 response does not mean it has actually been done already") - public Response forceCommit( + public SuccessResponse forceCommit( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); validate(tableNameWithType); + String submittedJobId = null; try { - _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); - return Response.ok().build(); + Set<String> consumingSegmentsForceCommitted = _pinotLLCRealtimeSegmentManager.forceCommit(tableNameWithType); + try { + String jobId = UUID.randomUUID().toString(); + _pinotHelixResourceManager.addNewForceCommitJob(tableNameWithType, jobId, consumingSegmentsForceCommitted); + submittedJobId = jobId; + } catch (Exception e) { + LOGGER.error("Could not add force commit job metadata to ZK table : {}", tableNameWithType, e); + } } catch (Exception e) { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); } + + return new SuccessResponse( + String.format("Successfully submitted force commit job id. Job meta ZK storage status: %s, job id : %s", + (submittedJobId != null) ? "SUCCESS" : "FAILED", submittedJobId)); } + @GET + @Path("segments/forceCommitStatus/{jobId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted force commit operation", notes = "Get status for a submitted " + + "force commit operation") + public Map<String, String> getForceCommitJobStatus( + @ApiParam(value = "Force commit job id", required = true) @PathParam("jobId") String reloadJobId) + throws Exception { + Map<String, String> controllerJobZKMetadata = _pinotHelixResourceManager.getControllerJobZKMetadata(reloadJobId); + String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); + Set<String> consumingSegmentCommitted = new HashSet<>(); + consumingSegmentCommitted = JsonUtils.stringToObject( + controllerJobZKMetadata.get(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST), + consumingSegmentCommitted.getClass()); + Set<String> onlineSegmentsForTable = Review Comment: Actually we cannot use this method because it will also track the CONSUMING segments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org