Jackie-Jiang commented on code in PR #16045:
URL: https://github.com/apache/pinot/pull/16045#discussion_r2146237825


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> 
getLiveBrokers(@Context HttpHeaders heade
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @DELETE
+  @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")

Review Comment:
   We usually use `tables` as prefix. We don't need table type, as it only 
applies to `REALTIME` table.
   
   ```suggestion
     @Path("/tables/{tableName}/{instanceId}/ingestionMetrics")
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> 
getLiveBrokers(@Context HttpHeaders heade
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @DELETE
+  @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action =
+      Actions.Table.DELETE_INGESTION_METRICS)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Remove realtime ingestion metrics emitted per 
partitionGroupID from serverInstance",
+      notes = "Removes ingestion-related metrics from serverInstance for 
partition(s) under the specified table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully removed ingestion 
metrics."),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  public SuccessResponse removeIngestionMetrics(
+      @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
+      String tableNameWithType,
+      @ApiParam(value = "Instance name of the server", required = true) 
@PathParam("instanceName")
+      String instanceName,
+      @ApiParam(value = "Comma-separated list of partition group IDs 
(optional)") @QueryParam("partitionGroupId")

Review Comment:
   Does it take comma separated one, or it has to be specified as multiple 
parameters, such as `partitionId=1&partitionId=2`?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -319,6 +321,21 @@ public void stopTrackingPartitionIngestionDelay(int 
partitionId) {
     removePartitionId(partitionId);
   }
 
+  /**
+   * Handles all partition removal event. This must be invoked when we stop 
serving partitions for this table in the
+   * current server.
+   *
+   * @return Set of partitionIds for which ingestion metrics were removed.
+   */
+  public Set<Integer> stopTrackingPartitionIngestionDelay() {
+    Set<Integer> removedPartitionIds = new HashSet<>();

Review Comment:
   I feel it is safer to first initialize `removedPartitionIds = new 
HashSet<>(_ingestionInfoMap.keySet())` to avoid iterating and modifying the 
same map at the same time, even though concurrent map can probably handle it



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> 
getLiveBrokers(@Context HttpHeaders heade
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @DELETE
+  @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action =
+      Actions.Table.DELETE_INGESTION_METRICS)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Remove realtime ingestion metrics emitted per 
partitionGroupID from serverInstance",
+      notes = "Removes ingestion-related metrics from serverInstance for 
partition(s) under the specified table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully removed ingestion 
metrics."),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  public SuccessResponse removeIngestionMetrics(
+      @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
+      String tableNameWithType,
+      @ApiParam(value = "Instance name of the server", required = true) 
@PathParam("instanceName")
+      String instanceName,
+      @ApiParam(value = "Comma-separated list of partition group IDs 
(optional)") @QueryParam("partitionGroupId")
+      Set<Integer> partitionGroupIds,
+      @Context HttpHeaders headers) {
+    try {
+      tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
+    }
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      throw new ControllerApplicationException(LOGGER, "Table " + 
tableNameWithType + " should be a realtime table.",
+          Response.Status.BAD_REQUEST);
+    }
+    String serverEndpoint;
+    try {
+      BiMap<String, String> dataInstanceAdminEndpoints =
+          
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceName));
+      serverEndpoint = dataInstanceAdminEndpoints.get(instanceName);
+      Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found 
for instance: " + instanceName);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to get server 
endpoint for instance: " + instanceName,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    StringBuilder uriBuilder = new StringBuilder(serverEndpoint)
+        .append("/tables/")
+        .append(tableNameWithType)
+        .append("/ingestionMetrics");
+
+    if (partitionGroupIds != null && !partitionGroupIds.isEmpty()) {

Review Comment:
   (minor)
   ```suggestion
       if (CollectionUtils.isNotEmpty(partitionIds)) {
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java:
##########
@@ -319,6 +321,21 @@ public void stopTrackingPartitionIngestionDelay(int 
partitionId) {
     removePartitionId(partitionId);
   }
 
+  /**
+   * Handles all partition removal event. This must be invoked when we stop 
serving partitions for this table in the
+   * current server.
+   *
+   * @return Set of partitionIds for which ingestion metrics were removed.
+   */
+  public Set<Integer> stopTrackingPartitionIngestionDelay() {

Review Comment:
   ```suggestion
     public Set<Integer> stopTrackingIngestionDelayForAllPartitions() {
   ```



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> 
getLiveBrokers(@Context HttpHeaders heade
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @DELETE
+  @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action =
+      Actions.Table.DELETE_INGESTION_METRICS)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Remove realtime ingestion metrics emitted per 
partitionGroupID from serverInstance",
+      notes = "Removes ingestion-related metrics from serverInstance for 
partition(s) under the specified table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully removed ingestion 
metrics."),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  public SuccessResponse removeIngestionMetrics(
+      @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
+      String tableNameWithType,
+      @ApiParam(value = "Instance name of the server", required = true) 
@PathParam("instanceName")
+      String instanceName,
+      @ApiParam(value = "Comma-separated list of partition group IDs 
(optional)") @QueryParam("partitionGroupId")
+      Set<Integer> partitionGroupIds,
+      @Context HttpHeaders headers) {
+    try {
+      tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
+    }
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      throw new ControllerApplicationException(LOGGER, "Table " + 
tableNameWithType + " should be a realtime table.",
+          Response.Status.BAD_REQUEST);
+    }
+    String serverEndpoint;
+    try {
+      BiMap<String, String> dataInstanceAdminEndpoints =
+          
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceName));
+      serverEndpoint = dataInstanceAdminEndpoints.get(instanceName);
+      Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found 
for instance: " + instanceName);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to get server 
endpoint for instance: " + instanceName,
+          Response.Status.INTERNAL_SERVER_ERROR);

Review Comment:
   Should we count this as bad request?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> 
getLiveBrokers(@Context HttpHeaders heade
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @DELETE
+  @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action =
+      Actions.Table.DELETE_INGESTION_METRICS)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Remove realtime ingestion metrics emitted per 
partitionGroupID from serverInstance",

Review Comment:
   Let's call it `partitionId`. I believe `partitionGroupId` was introduced for 
some historical reason (probably high level consumer which is already removed), 
and let's use `partitionId` going forward for conciseness



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java:
##########
@@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> 
getLiveBrokers(@Context HttpHeaders heade
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @DELETE
+  @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", 
action =
+      Actions.Table.DELETE_INGESTION_METRICS)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Remove realtime ingestion metrics emitted per 
partitionGroupID from serverInstance",
+      notes = "Removes ingestion-related metrics from serverInstance for 
partition(s) under the specified table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully removed ingestion 
metrics."),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  public SuccessResponse removeIngestionMetrics(
+      @ApiParam(value = "Table name with type", required = true) 
@PathParam("tableNameWithType")
+      String tableNameWithType,
+      @ApiParam(value = "Instance name of the server", required = true) 
@PathParam("instanceName")
+      String instanceName,
+      @ApiParam(value = "Comma-separated list of partition group IDs 
(optional)") @QueryParam("partitionGroupId")
+      Set<Integer> partitionGroupIds,

Review Comment:
   Mark it `@Nullable`



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -1192,4 +1193,44 @@ public List<StaleSegment> getStaleSegments(
       throw new WebApplicationException(e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
     }
   }
+
+  @DELETE
+  @Path("/tables/{tableNameWithType}/ingestionMetrics")

Review Comment:
   Same comments as the controller one



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to