This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 87f70ffe1d Adds remove ingestion-metrics API (#16045) 87f70ffe1d is described below commit 87f70ffe1db7a5ff301fd01ab73c83fe34da433b Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Tue Jun 17 00:26:53 2025 +0530 Adds remove ingestion-metrics API (#16045) --- .../api/resources/PinotTableInstances.java | 71 ++++++++++++++++++++++ .../java/org/apache/pinot/core/auth/Actions.java | 1 + .../manager/realtime/IngestionDelayTracker.java | 16 +++++ .../manager/realtime/RealtimeTableDataManager.java | 10 +++ .../pinot/server/api/resources/TablesResource.java | 41 +++++++++++++ 5 files changed, 139 insertions(+) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java index 15d1ca0504..419b5f1b6f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java @@ -20,6 +20,8 @@ package org.apache.pinot.controller.api.resources; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.collect.BiMap; import io.swagger.annotations.Api; import io.swagger.annotations.ApiKeyAuthDefinition; import io.swagger.annotations.ApiOperation; @@ -29,9 +31,15 @@ import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.net.URI; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.inject.Inject; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.Path; @@ -42,8 +50,13 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.commons.collections.CollectionUtils; import org.apache.pinot.common.exception.TableNotFoundException; import org.apache.pinot.common.utils.DatabaseUtils; +import org.apache.pinot.common.utils.SimpleHttpResponse; +import org.apache.pinot.common.utils.http.HttpClient; +import org.apache.pinot.controller.api.access.AccessType; +import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.core.auth.Actions; @@ -186,4 +199,62 @@ public class PinotTableInstances { throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.NOT_FOUND); } } + + @DELETE + @Path("/tables/{tableName}/{instanceId}/ingestionMetrics") + @Produces(MediaType.APPLICATION_JSON) + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.DELETE_INGESTION_METRICS) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Remove realtime ingestion metrics emitted per partitionId from serverInstance", notes = + "Removes ingestion-related metrics from serverInstance for partition(s) under the specified table") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully removed ingestion metrics."), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + public SuccessResponse removeIngestionMetrics( + @ApiParam(value = "Table name", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Instance id of the server", required = true) @PathParam("instanceId") String instanceId, + @ApiParam(value = "List of Partition Ids (optional)") @QueryParam("partitionId") @Nullable + Set<Integer> partitionIds, + @Context HttpHeaders headers) { + try { + tableName = DatabaseUtils.translateTableName(tableName, headers); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.BAD_REQUEST); + } + String tableNameWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, TableType.REALTIME, LOGGER) + .get(0); + String serverEndpoint; + try { + BiMap<String, String> dataInstanceAdminEndpoints = + _pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceId)); + serverEndpoint = dataInstanceAdminEndpoints.get(instanceId); + Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found for instance: " + instanceId); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, "Failed to get server endpoint for instance: " + instanceId, + Response.Status.BAD_REQUEST); + } + StringBuilder uriBuilder = new StringBuilder(serverEndpoint) + .append("/tables/") + .append(tableNameWithType) + .append("/ingestionMetrics"); + + if (CollectionUtils.isNotEmpty(partitionIds)) { + String query = partitionIds.stream() + .map(id -> "partitionId=" + id) + .collect(Collectors.joining("&")); + uriBuilder.append("?").append(query); + } + + String fullUrl = uriBuilder.toString(); + SimpleHttpResponse simpleHttpResponse; + try { + simpleHttpResponse = + HttpClient.wrapAndThrowHttpException(HttpClient.getInstance().sendDeleteRequest(URI.create(fullUrl))); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); + } + return new SuccessResponse(simpleHttpResponse.getResponse()); + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java index 6dec9b7903..000aab90c8 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java @@ -120,6 +120,7 @@ public class Actions { public static final String DELETE_SEGMENT = "DeleteSegment"; public static final String DELETE_TABLE = "DeleteTable"; public static final String DELETE_TIME_BOUNDARY = "DeleteTimeBoundary"; + public static final String DELETE_INGESTION_METRICS = "DeleteIngestionMetrics"; public static final String DISABLE_TABLE = "DisableTable"; public static final String DOWNLOAD_SEGMENT = "DownloadSegment"; public static final String ENABLE_TABLE = "EnableTable"; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index 2b52b29f2d..a4cbef0488 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -23,6 +23,7 @@ import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import java.time.Clock; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -212,6 +213,7 @@ public class IngestionDelayTracker { _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET); _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET); + LOGGER.info("Successfully removed ingestion metrics for partition id: {}", partitionId); } return null; }); @@ -319,6 +321,20 @@ public class IngestionDelayTracker { removePartitionId(partitionId); } + /** + * Handles all partition removal event. This must be invoked when we stop serving partitions for this table in the + * current server. + * + * @return Set of partitionIds for which ingestion metrics were removed. + */ + public Set<Integer> stopTrackingIngestionDelayForAllPartitions() { + Set<Integer> removedPartitionIds = new HashSet<>(_ingestionInfoMap.keySet()); + for (Integer partitionId : _ingestionInfoMap.keySet()) { + removePartitionId(partitionId); + } + return removedPartitionIds; + } + /** * Stops tracking the partition ingestion delay, and also ignores the updates from the given segment. This is useful * when we want to stop tracking the ingestion delay for a partition when the segment might still be consuming, e.g. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 277fd5b910..e56e937b99 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -509,6 +509,16 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } } + public Set<Integer> stopTrackingPartitionIngestionDelay(@Nullable Set<Integer> partitionIds) { + if (CollectionUtils.isEmpty(partitionIds)) { + return _ingestionDelayTracker.stopTrackingIngestionDelayForAllPartitions(); + } + for (Integer partitionId: partitionIds) { + _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId); + } + return partitionIds; + } + private void doAddConsumingSegment(String segmentName) throws Exception { SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index da822cef86..d25b4b872c 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -41,8 +41,10 @@ import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; +import javax.annotation.Nullable; import javax.inject.Inject; import javax.inject.Named; +import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; @@ -1192,4 +1194,43 @@ public class TablesResource { throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); } } + + @DELETE + @Path("/tables/{tableName}/ingestionMetrics") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Remove ingestion metrics for partition(s)", notes = "Removes ingestion-related metrics for " + + "the given table. If no partitionId is provided, metrics for all partitions hosted by this server will be " + + "removed.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully removed ingestion metrics"), + @ApiResponse(code = 500, message = "Internal Server Error") + }) + public String removeIngestionMetrics( + @ApiParam(value = "Table name", required = true) @PathParam("tableName") String tableName, + @Nullable @ApiParam(value = "List of partition Ids (optional)") @QueryParam("partitionId") + Set<Integer> partitionIds, + @Context HttpHeaders headers) { + try { + tableName = DatabaseUtils.translateTableName(tableName, headers); + } catch (Exception e) { + throw new WebApplicationException(e.getMessage(), Response.Status.BAD_REQUEST); + } + String tableNameWithType = TableNameBuilder.REALTIME.tableNameWithType(tableName); + TableDataManager tableDataManager = + ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType); + try { + if (tableDataManager instanceof RealtimeTableDataManager) { + RealtimeTableDataManager realtimeTableDataManager = (RealtimeTableDataManager) tableDataManager; + Set<Integer> removedPartitionIds = realtimeTableDataManager.stopTrackingPartitionIngestionDelay(partitionIds); + return "Successfully removed ingestion metrics for partitions: " + removedPartitionIds + " in table: " + + tableNameWithType; + } else { + throw new WebApplicationException( + "TableDataManager is not RealtimeTableDataManager for table: " + tableNameWithType, + Response.Status.BAD_REQUEST); + } + } catch (Exception e) { + throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org