Jackie-Jiang commented on code in PR #16045: URL: https://github.com/apache/pinot/pull/16045#discussion_r2146237825
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java: ########## @@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> getLiveBrokers(@Context HttpHeaders heade throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @DELETE + @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics") Review Comment: We usually use `tables` as prefix. We don't need table type, as it only applies to `REALTIME` table. ```suggestion @Path("/tables/{tableName}/{instanceId}/ingestionMetrics") ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java: ########## @@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> getLiveBrokers(@Context HttpHeaders heade throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @DELETE + @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics") + @Produces(MediaType.APPLICATION_JSON) + @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = + Actions.Table.DELETE_INGESTION_METRICS) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Remove realtime ingestion metrics emitted per partitionGroupID from serverInstance", + notes = "Removes ingestion-related metrics from serverInstance for partition(s) under the specified table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully removed ingestion metrics."), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + public SuccessResponse removeIngestionMetrics( + @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") + String tableNameWithType, + @ApiParam(value = "Instance name of the server", required = true) @PathParam("instanceName") + String instanceName, + @ApiParam(value = "Comma-separated list of partition group IDs (optional)") @QueryParam("partitionGroupId") Review Comment: Does it take comma separated one, or it has to be specified as multiple parameters, such as `partitionId=1&partitionId=2`? ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java: ########## @@ -319,6 +321,21 @@ public void stopTrackingPartitionIngestionDelay(int partitionId) { removePartitionId(partitionId); } + /** + * Handles all partition removal event. This must be invoked when we stop serving partitions for this table in the + * current server. + * + * @return Set of partitionIds for which ingestion metrics were removed. + */ + public Set<Integer> stopTrackingPartitionIngestionDelay() { + Set<Integer> removedPartitionIds = new HashSet<>(); Review Comment: I feel it is safer to first initialize `removedPartitionIds = new HashSet<>(_ingestionInfoMap.keySet())` to avoid iterating and modifying the same map at the same time, even though concurrent map can probably handle it ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java: ########## @@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> getLiveBrokers(@Context HttpHeaders heade throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @DELETE + @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics") + @Produces(MediaType.APPLICATION_JSON) + @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = + Actions.Table.DELETE_INGESTION_METRICS) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Remove realtime ingestion metrics emitted per partitionGroupID from serverInstance", + notes = "Removes ingestion-related metrics from serverInstance for partition(s) under the specified table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully removed ingestion metrics."), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + public SuccessResponse removeIngestionMetrics( + @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") + String tableNameWithType, + @ApiParam(value = "Instance name of the server", required = true) @PathParam("instanceName") + String instanceName, + @ApiParam(value = "Comma-separated list of partition group IDs (optional)") @QueryParam("partitionGroupId") + Set<Integer> partitionGroupIds, + @Context HttpHeaders headers) { + try { + tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST); + } + if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + throw new ControllerApplicationException(LOGGER, "Table " + tableNameWithType + " should be a realtime table.", + Response.Status.BAD_REQUEST); + } + String serverEndpoint; + try { + BiMap<String, String> dataInstanceAdminEndpoints = + _pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceName)); + serverEndpoint = dataInstanceAdminEndpoints.get(instanceName); + Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found for instance: " + instanceName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Failed to get server endpoint for instance: " + instanceName, + Response.Status.INTERNAL_SERVER_ERROR); + } + StringBuilder uriBuilder = new StringBuilder(serverEndpoint) + .append("/tables/") + .append(tableNameWithType) + .append("/ingestionMetrics"); + + if (partitionGroupIds != null && !partitionGroupIds.isEmpty()) { Review Comment: (minor) ```suggestion if (CollectionUtils.isNotEmpty(partitionIds)) { ``` ########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java: ########## @@ -319,6 +321,21 @@ public void stopTrackingPartitionIngestionDelay(int partitionId) { removePartitionId(partitionId); } + /** + * Handles all partition removal event. This must be invoked when we stop serving partitions for this table in the + * current server. + * + * @return Set of partitionIds for which ingestion metrics were removed. + */ + public Set<Integer> stopTrackingPartitionIngestionDelay() { Review Comment: ```suggestion public Set<Integer> stopTrackingIngestionDelayForAllPartitions() { ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java: ########## @@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> getLiveBrokers(@Context HttpHeaders heade throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @DELETE + @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics") + @Produces(MediaType.APPLICATION_JSON) + @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = + Actions.Table.DELETE_INGESTION_METRICS) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Remove realtime ingestion metrics emitted per partitionGroupID from serverInstance", + notes = "Removes ingestion-related metrics from serverInstance for partition(s) under the specified table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully removed ingestion metrics."), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + public SuccessResponse removeIngestionMetrics( + @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") + String tableNameWithType, + @ApiParam(value = "Instance name of the server", required = true) @PathParam("instanceName") + String instanceName, + @ApiParam(value = "Comma-separated list of partition group IDs (optional)") @QueryParam("partitionGroupId") + Set<Integer> partitionGroupIds, + @Context HttpHeaders headers) { + try { + tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST); + } + if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) { + throw new ControllerApplicationException(LOGGER, "Table " + tableNameWithType + " should be a realtime table.", + Response.Status.BAD_REQUEST); + } + String serverEndpoint; + try { + BiMap<String, String> dataInstanceAdminEndpoints = + _pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceName)); + serverEndpoint = dataInstanceAdminEndpoints.get(instanceName); + Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found for instance: " + instanceName); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Failed to get server endpoint for instance: " + instanceName, + Response.Status.INTERNAL_SERVER_ERROR); Review Comment: Should we count this as bad request? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java: ########## @@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> getLiveBrokers(@Context HttpHeaders heade throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @DELETE + @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics") + @Produces(MediaType.APPLICATION_JSON) + @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = + Actions.Table.DELETE_INGESTION_METRICS) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Remove realtime ingestion metrics emitted per partitionGroupID from serverInstance", Review Comment: Let's call it `partitionId`. I believe `partitionGroupId` was introduced for some historical reason (probably high level consumer which is already removed), and let's use `partitionId` going forward for conciseness ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java: ########## @@ -186,4 +198,66 @@ public Map<String, List<InstanceInfo>> getLiveBrokers(@Context HttpHeaders heade throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @DELETE + @Path("/table/{tableNameWithType}/{instanceName}/ingestionMetrics") + @Produces(MediaType.APPLICATION_JSON) + @Authorize(targetType = TargetType.TABLE, paramName = "tableNameWithType", action = + Actions.Table.DELETE_INGESTION_METRICS) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Remove realtime ingestion metrics emitted per partitionGroupID from serverInstance", + notes = "Removes ingestion-related metrics from serverInstance for partition(s) under the specified table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully removed ingestion metrics."), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + public SuccessResponse removeIngestionMetrics( + @ApiParam(value = "Table name with type", required = true) @PathParam("tableNameWithType") + String tableNameWithType, + @ApiParam(value = "Instance name of the server", required = true) @PathParam("instanceName") + String instanceName, + @ApiParam(value = "Comma-separated list of partition group IDs (optional)") @QueryParam("partitionGroupId") + Set<Integer> partitionGroupIds, Review Comment: Mark it `@Nullable` ########## pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java: ########## @@ -1192,4 +1193,44 @@ public List<StaleSegment> getStaleSegments( throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); } } + + @DELETE + @Path("/tables/{tableNameWithType}/ingestionMetrics") Review Comment: Same comments as the controller one -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org