noob-se7en commented on code in PR #15299: URL: https://github.com/apache/pinot/pull/15299#discussion_r2002827185
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -1175,6 +1179,136 @@ public SuccessResponse updateTimeIntervalZK( return updateZKTimeIntervalInternal(tableNameWithType); } + @DELETE + @Produces(MediaType.APPLICATION_JSON) + @Path("/deletePauselessSegments/{tableName}") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.DELETE_SEGMENT) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Delete segments from a pauseless enabled table", notes = + "Deletes segments from a pauseless-enabled table based on the provided segment names. " + + "For each segment provided, it identifies the partition and deletes all segments " + + "with sequence numbers >= the provided segment in that partition. " + + "When force flag is true, it bypasses checks for pauseless being enabled and table being paused. " + + "The retention period controls how long deleted segments are retained before permanent removal. " + + "It follows this precedence: input parameter → table config → cluster setting → 7d default. " + + "Use 0d or -1d for immediate deletion without retention.") + public SuccessResponse deletePauselessSegments( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") + String tableNameWithType, + @ApiParam(value = "List of segment names. For each segment, all segments with higher sequence IDs in the same " + + "partition will be deleted", required = true, allowMultiple = true) + @QueryParam("segments") List<String> segments, + @ApiParam(value = "Force flag to bypass checks for pauseless being enabled and table being paused", + defaultValue = "false") @QueryParam("force") boolean force, + @Context HttpHeaders headers + ) { + + tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); + + Preconditions.checkState(TableNameBuilder.isRealtimeTableResource(tableNameWithType), + "Table should be a realtime table."); + + // Validate input segments + if (segments == null || segments.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "Segment list must not be empty", Status.BAD_REQUEST); + } + + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + + if (!force) { + // Check if pauseless is enabled + Preconditions.checkState(PauselessConsumptionUtils.isPauselessEnabled(tableConfig), + "Pauseless is not enabled for the table " + tableNameWithType); + // Check if the ingestion has been paused + Preconditions.checkState(_pinotHelixResourceManager.getRealtimeSegmentManager() + .getPauseStatusDetails(tableNameWithType) + .getPauseFlag(), "Table " + tableNameWithType + " should be paused before deleting segments."); + } + + IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Ideal State does not exist for table " + tableNameWithType); + + Map<Integer, LLCSegmentName> partitionToOldestSegment = getPartitionIDToOldestSegment(segments); + Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>(); + Map<Integer, Set<String>> partitionIdToSegmentsToDeleteMap = + getPartitionIdToSegmentsToDeleteMap(partitionToOldestSegment, idealState, partitionIdToLatestSegment); + for (Integer partitionID : partitionToOldestSegment.keySet()) { + Set<String> segmentToDeleteForPartition = partitionIdToSegmentsToDeleteMap.get(partitionID); + LOGGER.info("Deleting : {} segments from segment: {} to segment: {} for partition: {}", + segmentToDeleteForPartition.size(), partitionToOldestSegment.get(partitionID), + partitionIdToLatestSegment.get(partitionID), partitionID); + deleteSegmentsInternal(tableNameWithType, new ArrayList<>(segmentToDeleteForPartition), null); + } + + return new SuccessResponse("Successfully deleted segments for table: " + tableNameWithType); + } + + /** + * Identifies segments that need to be deleted based on partition and sequence ID information. + * + * For each partition in the provided partitionToOldestSegment map, this method identifies + * all segments with sequence IDs greater than or equal to the oldest segment's sequence ID. + * It also tracks the latest segment (highest sequence ID) for each partition, which is useful + * for logging purposes. + * + * @param partitionToOldestSegment Map of partition IDs to their corresponding oldest segment (lowest sequence ID) + * that serves as the threshold for deletion. All segments with sequence IDs + * greater than or equal to this will be selected for deletion. + * @param idealState The table's ideal state which contains information about all existing segments. + * @param partitionIdToLatestSegment A map that will be populated with the latest segment (highest sequence ID) + * for each partition. This is passed by reference and modified by this method. + * + * @return A map from partition IDs to sets of segment names that should be deleted. + * Each set contains all segments with sequence IDs >= the oldest segment's sequence ID + * for that particular partition. + */ + @VisibleForTesting + Map<Integer, Set<String>> getPartitionIdToSegmentsToDeleteMap( + Map<Integer, LLCSegmentName> partitionToOldestSegment, + IdealState idealState, Map<Integer, LLCSegmentName> partitionIdToLatestSegment) { + + // Find segments to delete (those with higher sequence numbers) + Map<Integer, Set<String>> partitionToSegmentsToDelete = new HashMap<>(); + Map<String, Map<String, String>> segmentsToInstanceState = idealState.getRecord().getMapFields(); + + for (String segmentName : segmentsToInstanceState.keySet()) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + int partitionId = llcSegmentName.getPartitionGroupId(); + + LLCSegmentName oldestSegment = partitionToOldestSegment.get(partitionId); + if (oldestSegment != null && oldestSegment.getSequenceNumber() <= llcSegmentName.getSequenceNumber()) { + partitionToSegmentsToDelete + .computeIfAbsent(partitionId, k -> new HashSet<>()) + .add(segmentName); + } + + // Track latest segment (segment with highest sequence ID) + LLCSegmentName currentLatest = partitionIdToLatestSegment.get(partitionId); + if (currentLatest == null || llcSegmentName.getSequenceNumber() > currentLatest.getSequenceNumber()) { + partitionIdToLatestSegment.put(partitionId, llcSegmentName); + } + } + + return partitionToSegmentsToDelete; + } + + @VisibleForTesting + Map<Integer, LLCSegmentName> getPartitionIDToOldestSegment(List<String> segments) { + Map<Integer, LLCSegmentName> partitionToOldestSegment = new HashMap<>(); + + for (String segment : segments) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segment); Review Comment: Let's add NPE check here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org