noob-se7en commented on code in PR #15299:
URL: https://github.com/apache/pinot/pull/15299#discussion_r2002827185


##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -1175,6 +1179,136 @@ public SuccessResponse updateTimeIntervalZK(
     return updateZKTimeIntervalInternal(tableNameWithType);
   }
 
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/deletePauselessSegments/{tableName}")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.DELETE_SEGMENT)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Delete segments from a pauseless enabled table", 
notes =
+      "Deletes segments from a pauseless-enabled table based on the provided 
segment names. "
+          + "For each segment provided, it identifies the partition and 
deletes all segments "
+          + "with sequence numbers >= the provided segment in that partition. "
+          + "When force flag is true, it bypasses checks for pauseless being 
enabled and table being paused. "
+          + "The retention period controls how long deleted segments are 
retained before permanent removal. "
+          + "It follows this precedence: input parameter → table config → 
cluster setting → 7d default. "
+          + "Use 0d or -1d for immediate deletion without retention.")
+  public SuccessResponse deletePauselessSegments(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType")
+      String tableNameWithType,
+      @ApiParam(value = "List of segment names. For each segment, all segments 
with higher sequence IDs in the same "
+          + "partition will be deleted", required = true, allowMultiple = true)
+      @QueryParam("segments") List<String> segments,
+      @ApiParam(value = "Force flag to bypass checks for pauseless being 
enabled and table being paused",
+          defaultValue = "false") @QueryParam("force") boolean force,
+      @Context HttpHeaders headers
+  ) {
+
+    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+
+    
Preconditions.checkState(TableNameBuilder.isRealtimeTableResource(tableNameWithType),
+        "Table should be a realtime table.");
+
+    // Validate input segments
+    if (segments == null || segments.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "Segment list must not 
be empty", Status.BAD_REQUEST);
+    }
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+
+    if (!force) {
+      // Check if pauseless is enabled
+      
Preconditions.checkState(PauselessConsumptionUtils.isPauselessEnabled(tableConfig),
+          "Pauseless is not enabled for the table " + tableNameWithType);
+      // Check if the ingestion has been paused
+      
Preconditions.checkState(_pinotHelixResourceManager.getRealtimeSegmentManager()
+          .getPauseStatusDetails(tableNameWithType)
+          .getPauseFlag(), "Table " + tableNameWithType + " should be paused 
before deleting segments.");
+    }
+
+    IdealState idealState = 
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Ideal State does not exist 
for table " + tableNameWithType);
+
+    Map<Integer, LLCSegmentName> partitionToOldestSegment = 
getPartitionIDToOldestSegment(segments);
+    Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>();
+    Map<Integer, Set<String>> partitionIdToSegmentsToDeleteMap =
+        getPartitionIdToSegmentsToDeleteMap(partitionToOldestSegment, 
idealState, partitionIdToLatestSegment);
+    for (Integer partitionID : partitionToOldestSegment.keySet()) {
+      Set<String> segmentToDeleteForPartition = 
partitionIdToSegmentsToDeleteMap.get(partitionID);
+      LOGGER.info("Deleting : {} segments from segment: {} to segment: {} for 
partition: {}",
+          segmentToDeleteForPartition.size(), 
partitionToOldestSegment.get(partitionID),
+          partitionIdToLatestSegment.get(partitionID), partitionID);
+      deleteSegmentsInternal(tableNameWithType, new 
ArrayList<>(segmentToDeleteForPartition), null);
+    }
+
+    return new SuccessResponse("Successfully deleted segments for table: " + 
tableNameWithType);
+  }
+
+  /**
+   * Identifies segments that need to be deleted based on partition and 
sequence ID information.
+   *
+   * For each partition in the provided partitionToOldestSegment map, this 
method identifies
+   * all segments with sequence IDs greater than or equal to the oldest 
segment's sequence ID.
+   * It also tracks the latest segment (highest sequence ID) for each 
partition, which is useful
+   * for logging purposes.
+   *
+   * @param partitionToOldestSegment Map of partition IDs to their 
corresponding oldest segment (lowest sequence ID)
+   *                                that serves as the threshold for deletion. 
All segments with sequence IDs
+   *                                greater than or equal to this will be 
selected for deletion.
+   * @param idealState The table's ideal state which contains information 
about all existing segments.
+   * @param partitionIdToLatestSegment A map that will be populated with the 
latest segment (highest sequence ID)
+   *                                  for each partition. This is passed by 
reference and modified by this method.
+   *
+   * @return A map from partition IDs to sets of segment names that should be 
deleted.
+   *         Each set contains all segments with sequence IDs >= the oldest 
segment's sequence ID
+   *         for that particular partition.
+   */
+  @VisibleForTesting
+  Map<Integer, Set<String>> getPartitionIdToSegmentsToDeleteMap(
+      Map<Integer, LLCSegmentName> partitionToOldestSegment,
+      IdealState idealState, Map<Integer, LLCSegmentName> 
partitionIdToLatestSegment) {
+
+    // Find segments to delete (those with higher sequence numbers)
+    Map<Integer, Set<String>> partitionToSegmentsToDelete = new HashMap<>();
+    Map<String, Map<String, String>> segmentsToInstanceState = 
idealState.getRecord().getMapFields();
+
+    for (String segmentName : segmentsToInstanceState.keySet()) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+      int partitionId = llcSegmentName.getPartitionGroupId();
+
+      LLCSegmentName oldestSegment = partitionToOldestSegment.get(partitionId);
+      if (oldestSegment != null && oldestSegment.getSequenceNumber() <= 
llcSegmentName.getSequenceNumber()) {
+        partitionToSegmentsToDelete
+            .computeIfAbsent(partitionId, k -> new HashSet<>())
+            .add(segmentName);
+      }
+
+      // Track latest segment (segment with highest sequence ID)
+      LLCSegmentName currentLatest = 
partitionIdToLatestSegment.get(partitionId);
+      if (currentLatest == null || llcSegmentName.getSequenceNumber() > 
currentLatest.getSequenceNumber()) {
+        partitionIdToLatestSegment.put(partitionId, llcSegmentName);
+      }
+    }
+
+    return partitionToSegmentsToDelete;
+  }
+
+  @VisibleForTesting
+  Map<Integer, LLCSegmentName> getPartitionIDToOldestSegment(List<String> 
segments) {
+    Map<Integer, LLCSegmentName> partitionToOldestSegment = new HashMap<>();
+
+    for (String segment : segments) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segment);

Review Comment:
   Let's add NPE check here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to