sajjad-moradi commented on code in PR #8986: URL: https://github.com/apache/pinot/pull/8986#discussion_r918209300
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java: ########## @@ -49,27 +50,71 @@ HttpHeaders.AUTHORIZATION, in = ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY))) @Path("/") public class PinotRealtimeTableResource { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotRealtimeTableResource.class); + @Inject PinotHelixResourceManager _pinotHelixResourceManager; + @Inject + PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; + + @POST + @Path("/tables/{tableName}/pauseConsumption") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Pause consumption of a realtime table", + notes = "Pause the consumption of a realtime table") + public Response pauseConsumption( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + validate(tableNameWithType); + try { + return Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } + @POST @Path("/tables/{tableName}/resumeConsumption") @Produces(MediaType.APPLICATION_JSON) - @Consumes(MediaType.APPLICATION_JSON) - @ApiOperation(value = "Resume the consumption of a realtime table", - notes = "Resume the consumption of a realtime table") - public String resumeConsumption( - @ApiParam(value = "Name of the table", required = true) - @PathParam("tableName") String tableName) throws JsonProcessingException { - // TODO: Add util method for invoking periodic tasks + @ApiOperation(value = "Resume consumption of a realtime table", + notes = "Resume the consumption for a realtime table") + public Response resumeConsumption( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); - Map<String, String> taskProperties = new HashMap<>(); - taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY, "true"); + validate(tableNameWithType); + try { + return Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } - Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager - .invokeControllerPeriodicTask(tableNameWithType, Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties); + @GET + @Path("/tables/{tableName}/pauseStatus") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Return pause status of a realtime table", + notes = "Return pause status of a realtime table along with list of consuming segments.") + public Response getConsumptionStatus( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName) { + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + validate(tableNameWithType); + try { + return Response.ok().entity(_pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build(); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e); + } + } - return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft() - + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 0) + "}"; + private void validate(String tableNameWithType) { + IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); + if (idealState == null) { + throw new ControllerApplicationException(LOGGER, "Ideal State is null for table " + tableNameWithType, Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org