sajjad-moradi commented on code in PR #8986:
URL: https://github.com/apache/pinot/pull/8986#discussion_r918209300


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java:
##########
@@ -49,27 +50,71 @@
     HttpHeaders.AUTHORIZATION, in = 
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
 @Path("/")
 public class PinotRealtimeTableResource {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotRealtimeTableResource.class);
+
   @Inject
   PinotHelixResourceManager _pinotHelixResourceManager;
 
+  @Inject
+  PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
+
+  @POST
+  @Path("/tables/{tableName}/pauseConsumption")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Pause consumption of a realtime table",
+      notes = "Pause the consumption of a realtime table")
+  public Response pauseConsumption(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    try {
+      return 
Response.ok(_pinotLLCRealtimeSegmentManager.pauseConsumption(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
+
   @POST
   @Path("/tables/{tableName}/resumeConsumption")
   @Produces(MediaType.APPLICATION_JSON)
-  @Consumes(MediaType.APPLICATION_JSON)
-  @ApiOperation(value = "Resume the consumption of a realtime table",
-      notes = "Resume the consumption of a realtime table")
-  public String resumeConsumption(
-      @ApiParam(value = "Name of the table", required = true)
-      @PathParam("tableName") String tableName) throws JsonProcessingException 
{
-    // TODO: Add util method for invoking periodic tasks
+  @ApiOperation(value = "Resume consumption of a realtime table",
+      notes = "Resume the consumption for a realtime table")
+  public Response resumeConsumption(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
     String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
-    Map<String, String> taskProperties = new HashMap<>();
-    
taskProperties.put(RealtimeSegmentValidationManager.RECREATE_DELETED_CONSUMING_SEGMENT_KEY,
 "true");
+    validate(tableNameWithType);
+    try {
+      return 
Response.ok(_pinotLLCRealtimeSegmentManager.resumeConsumption(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 
-    Pair<String, Integer> taskExecutionDetails = _pinotHelixResourceManager
-        .invokeControllerPeriodicTask(tableNameWithType, 
Constants.REALTIME_SEGMENT_VALIDATION_MANAGER, taskProperties);
+  @GET
+  @Path("/tables/{tableName}/pauseStatus")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Return pause status of a realtime table",
+      notes = "Return pause status of a realtime table along with list of 
consuming segments.")
+  public Response getConsumptionStatus(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName) {
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    validate(tableNameWithType);
+    try {
+      return 
Response.ok().entity(_pinotLLCRealtimeSegmentManager.getPauseStatus(tableNameWithType)).build();
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR, e);
+    }
+  }
 
-    return "{\"Log Request Id\": \"" + taskExecutionDetails.getLeft()
-        + "\",\"Controllers notified\":" + (taskExecutionDetails.getRight() > 
0) + "}";
+  private void validate(String tableNameWithType) {
+    IdealState idealState = 
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+    if (idealState == null) {
+      throw new ControllerApplicationException(LOGGER, "Ideal State is null 
for table " + tableNameWithType,

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to