This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 87f70ffe1d Adds remove ingestion-metrics API (#16045)
87f70ffe1d is described below

commit 87f70ffe1db7a5ff301fd01ab73c83fe34da433b
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Tue Jun 17 00:26:53 2025 +0530

    Adds remove ingestion-metrics API (#16045)
---
 .../api/resources/PinotTableInstances.java         | 71 ++++++++++++++++++++++
 .../java/org/apache/pinot/core/auth/Actions.java   |  1 +
 .../manager/realtime/IngestionDelayTracker.java    | 16 +++++
 .../manager/realtime/RealtimeTableDataManager.java | 10 +++
 .../pinot/server/api/resources/TablesResource.java | 41 +++++++++++++
 5 files changed, 139 insertions(+)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
index 15d1ca0504..419b5f1b6f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableInstances.java
@@ -20,6 +20,8 @@ package org.apache.pinot.controller.api.resources;
 
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.BiMap;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiKeyAuthDefinition;
 import io.swagger.annotations.ApiOperation;
@@ -29,9 +31,15 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.net.URI;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -42,8 +50,13 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.exception.TableNotFoundException;
 import org.apache.pinot.common.utils.DatabaseUtils;
+import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.api.access.AccessType;
+import org.apache.pinot.controller.api.access.Authenticate;
 import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.core.auth.Actions;
@@ -186,4 +199,62 @@ public class PinotTableInstances {
       throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.NOT_FOUND);
     }
   }
+
+  @DELETE
+  @Path("/tables/{tableName}/{instanceId}/ingestionMetrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.DELETE_INGESTION_METRICS)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Remove realtime ingestion metrics emitted per 
partitionId from serverInstance", notes =
+      "Removes ingestion-related metrics from serverInstance for partition(s) 
under the specified table")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully removed ingestion 
metrics."),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  public SuccessResponse removeIngestionMetrics(
+      @ApiParam(value = "Table name", required = true) @PathParam("tableName") 
String tableName,
+      @ApiParam(value = "Instance id of the server", required = true) 
@PathParam("instanceId") String instanceId,
+      @ApiParam(value = "List of Partition Ids (optional)") 
@QueryParam("partitionId") @Nullable
+      Set<Integer> partitionIds,
+      @Context HttpHeaders headers) {
+    try {
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.BAD_REQUEST);
+    }
+    String tableNameWithType =
+        
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, TableType.REALTIME, LOGGER)
+            .get(0);
+    String serverEndpoint;
+    try {
+      BiMap<String, String> dataInstanceAdminEndpoints =
+          
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(Collections.singleton(instanceId));
+      serverEndpoint = dataInstanceAdminEndpoints.get(instanceId);
+      Preconditions.checkNotNull(serverEndpoint, "Server endpoint not found 
for instance: " + instanceId);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, "Failed to get server 
endpoint for instance: " + instanceId,
+          Response.Status.BAD_REQUEST);
+    }
+    StringBuilder uriBuilder = new StringBuilder(serverEndpoint)
+        .append("/tables/")
+        .append(tableNameWithType)
+        .append("/ingestionMetrics");
+
+    if (CollectionUtils.isNotEmpty(partitionIds)) {
+      String query = partitionIds.stream()
+          .map(id -> "partitionId=" + id)
+          .collect(Collectors.joining("&"));
+      uriBuilder.append("?").append(query);
+    }
+
+    String fullUrl = uriBuilder.toString();
+    SimpleHttpResponse simpleHttpResponse;
+    try {
+      simpleHttpResponse =
+          
HttpClient.wrapAndThrowHttpException(HttpClient.getInstance().sendDeleteRequest(URI.create(fullUrl)));
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER, e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    return new SuccessResponse(simpleHttpResponse.getResponse());
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java 
b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
index 6dec9b7903..000aab90c8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/auth/Actions.java
@@ -120,6 +120,7 @@ public class Actions {
     public static final String DELETE_SEGMENT = "DeleteSegment";
     public static final String DELETE_TABLE = "DeleteTable";
     public static final String DELETE_TIME_BOUNDARY = "DeleteTimeBoundary";
+    public static final String DELETE_INGESTION_METRICS = 
"DeleteIngestionMetrics";
     public static final String DISABLE_TABLE = "DisableTable";
     public static final String DOWNLOAD_SEGMENT = "DownloadSegment";
     public static final String ENABLE_TABLE = "EnableTable";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 2b52b29f2d..a4cbef0488 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -23,6 +23,7 @@ import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import java.time.Clock;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -212,6 +213,7 @@ public class IngestionDelayTracker {
         _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
         _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
         _serverMetrics.removePartitionGauge(_metricName, partitionId, 
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
+        LOGGER.info("Successfully removed ingestion metrics for partition id: 
{}", partitionId);
       }
       return null;
     });
@@ -319,6 +321,20 @@ public class IngestionDelayTracker {
     removePartitionId(partitionId);
   }
 
+  /**
+   * Handles all partition removal event. This must be invoked when we stop 
serving partitions for this table in the
+   * current server.
+   *
+   * @return Set of partitionIds for which ingestion metrics were removed.
+   */
+  public Set<Integer> stopTrackingIngestionDelayForAllPartitions() {
+    Set<Integer> removedPartitionIds = new 
HashSet<>(_ingestionInfoMap.keySet());
+    for (Integer partitionId : _ingestionInfoMap.keySet()) {
+      removePartitionId(partitionId);
+    }
+    return removedPartitionIds;
+  }
+
   /**
    * Stops tracking the partition ingestion delay, and also ignores the 
updates from the given segment. This is useful
    * when we want to stop tracking the ingestion delay for a partition when 
the segment might still be consuming, e.g.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 277fd5b910..e56e937b99 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -509,6 +509,16 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     }
   }
 
+  public Set<Integer> stopTrackingPartitionIngestionDelay(@Nullable 
Set<Integer> partitionIds) {
+    if (CollectionUtils.isEmpty(partitionIds)) {
+      return 
_ingestionDelayTracker.stopTrackingIngestionDelayForAllPartitions();
+    }
+    for (Integer partitionId: partitionIds) {
+      _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
+    }
+    return partitionIds;
+  }
+
   private void doAddConsumingSegment(String segmentName)
       throws Exception {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index da822cef86..d25b4b872c 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -41,8 +41,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.inject.Named;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
@@ -1192,4 +1194,43 @@ public class TablesResource {
       throw new WebApplicationException(e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
     }
   }
+
+  @DELETE
+  @Path("/tables/{tableName}/ingestionMetrics")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Remove ingestion metrics for partition(s)", notes = 
"Removes ingestion-related metrics for "
+      + "the given table. If no partitionId is provided, metrics for all 
partitions hosted by this server will be "
+      + "removed.")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Successfully removed ingestion 
metrics"),
+      @ApiResponse(code = 500, message = "Internal Server Error")
+  })
+  public String removeIngestionMetrics(
+      @ApiParam(value = "Table name", required = true) @PathParam("tableName") 
String tableName,
+      @Nullable @ApiParam(value = "List of partition Ids (optional)") 
@QueryParam("partitionId")
+      Set<Integer> partitionIds,
+      @Context HttpHeaders headers) {
+    try {
+      tableName = DatabaseUtils.translateTableName(tableName, headers);
+    } catch (Exception e) {
+      throw new WebApplicationException(e.getMessage(), 
Response.Status.BAD_REQUEST);
+    }
+    String tableNameWithType = 
TableNameBuilder.REALTIME.tableNameWithType(tableName);
+    TableDataManager tableDataManager =
+        ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
+    try {
+      if (tableDataManager instanceof RealtimeTableDataManager) {
+        RealtimeTableDataManager realtimeTableDataManager = 
(RealtimeTableDataManager) tableDataManager;
+        Set<Integer> removedPartitionIds = 
realtimeTableDataManager.stopTrackingPartitionIngestionDelay(partitionIds);
+        return "Successfully removed ingestion metrics for partitions: " + 
removedPartitionIds + " in table: "
+            + tableNameWithType;
+      } else {
+        throw new WebApplicationException(
+            "TableDataManager is not RealtimeTableDataManager for table: " + 
tableNameWithType,
+            Response.Status.BAD_REQUEST);
+      }
+    } catch (Exception e) {
+      throw new WebApplicationException(e.getMessage(), 
Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to