This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new eacd6c058a Allows segments deletion in build for pauseless tables (#15299) eacd6c058a is described below commit eacd6c058a4795541cd1759793734f8b5590e372 Author: 9aman <35227405+9a...@users.noreply.github.com> AuthorDate: Thu Mar 27 19:43:32 2025 +0530 Allows segments deletion in build for pauseless tables (#15299) --- .../api/resources/PinotSegmentRestletResource.java | 145 ++++++++++++++++++++- .../resources/PinotSegmentRestletResourceTest.java | 94 +++++++++++++ 2 files changed, 238 insertions(+), 1 deletion(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 600c75b718..a317391766 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -64,6 +65,7 @@ import javax.ws.rs.core.Response.Status; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.hc.client5.http.io.HttpClientConnectionManager; +import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.exception.InvalidConfigException; @@ -75,6 +77,7 @@ import org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckRespon import org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; import org.apache.pinot.common.utils.URIUtils; import org.apache.pinot.common.utils.UploadedRealtimeSegmentName; import org.apache.pinot.controller.ControllerConf; @@ -891,7 +894,8 @@ public class PinotSegmentRestletResource { return new SuccessResponse("Deleted " + numSegments + " segments from table: " + tableName); } - private void deleteSegmentsInternal(String tableNameWithType, List<String> segments, String retentionPeriod) { + private void deleteSegmentsInternal(String tableNameWithType, List<String> segments, + @Nullable String retentionPeriod) { PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments, retentionPeriod); if (!response.isSuccessful()) { @@ -1175,6 +1179,145 @@ public class PinotSegmentRestletResource { return updateZKTimeIntervalInternal(tableNameWithType); } + @DELETE + @Produces(MediaType.APPLICATION_JSON) + @Path("/deletePauselessSegments/{tableName}") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.DELETE_SEGMENT) + @Authenticate(AccessType.DELETE) + @ApiOperation(value = "Delete segments from a pauseless enabled table", notes = + "Deletes segments from a pauseless-enabled table based on the provided segment names. " + + "For each segment provided, it identifies the partition and deletes all segments " + + "with sequence numbers >= the provided segment in that partition. " + + "When force flag is true, it bypasses checks for pauseless being enabled and table being paused. " + + "The retention period controls how long deleted segments are retained before permanent removal. " + + "It follows this precedence: input parameter → table config → cluster setting → 7d default. " + + "Use 0d or -1d for immediate deletion without retention.") + public SuccessResponse deletePauselessSegments( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") + String tableNameWithType, + @ApiParam(value = "List of segment names. For each segment, all segments with higher sequence IDs in the same " + + "partition will be deleted", required = true, allowMultiple = true) + @QueryParam("segments") List<String> segments, + @ApiParam(value = "Force flag to bypass checks for pauseless being enabled and table being paused", + defaultValue = "false") @QueryParam("force") boolean force, + @Context HttpHeaders headers + ) { + + tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); + + Preconditions.checkState(TableNameBuilder.isRealtimeTableResource(tableNameWithType), + "Table should be a realtime table."); + + // Validate input segments + if (segments == null || segments.isEmpty()) { + throw new ControllerApplicationException(LOGGER, "Segment list must not be empty", Status.BAD_REQUEST); + } + + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + + if (!force) { + // Check if pauseless is enabled + Preconditions.checkState(PauselessConsumptionUtils.isPauselessEnabled(tableConfig), + "Pauseless is not enabled for the table " + tableNameWithType); + // Check if the ingestion has been paused + Preconditions.checkState(_pinotHelixResourceManager.getRealtimeSegmentManager() + .getPauseStatusDetails(tableNameWithType) + .getPauseFlag(), "Table " + tableNameWithType + " should be paused before deleting segments."); + } + + IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Ideal State does not exist for table " + tableNameWithType); + + Set<String> idealStateSegmentsSet = idealState.getRecord().getMapFields().keySet(); + Map<Integer, LLCSegmentName> partitionToOldestSegment = + getPartitionIDToOldestSegment(segments, idealStateSegmentsSet); + Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>(); + Map<Integer, Set<String>> partitionIdToSegmentsToDeleteMap = + getPartitionIdToSegmentsToDeleteMap(partitionToOldestSegment, idealStateSegmentsSet, + partitionIdToLatestSegment); + for (Integer partitionID : partitionToOldestSegment.keySet()) { + Set<String> segmentToDeleteForPartition = partitionIdToSegmentsToDeleteMap.get(partitionID); + LOGGER.info("Deleting : {} segments from segment: {} to segment: {} for partition: {}", + segmentToDeleteForPartition.size(), partitionToOldestSegment.get(partitionID), + partitionIdToLatestSegment.get(partitionID), partitionID); + deleteSegmentsInternal(tableNameWithType, new ArrayList<>(segmentToDeleteForPartition), null); + } + + return new SuccessResponse("Successfully deleted segments for table: " + tableNameWithType); + } + + /** + * Identifies segments that need to be deleted based on partition and sequence ID information. + * + * For each partition in the provided partitionToOldestSegment map, this method identifies + * all segments with sequence IDs greater than or equal to the oldest segment's sequence ID. + * It also tracks the latest segment (highest sequence ID) for each partition, which is useful + * for logging purposes. + * + * @param partitionToOldestSegment Map of partition IDs to their corresponding oldest segment (lowest sequence ID) + * that serves as the threshold for deletion. All segments with sequence IDs + * greater than or equal to this will be selected for deletion. + * @param idealStateSegmentsSet The segments present in the ideal state for the table + * @param partitionIdToLatestSegment A map that will be populated with the latest segment (highest sequence ID) + * for each partition. This is passed by reference and modified by this method. + * + * @return A map from partition IDs to sets of segment names that should be deleted. + * Each set contains all segments with sequence IDs >= the oldest segment's sequence ID + * for that particular partition. + */ + @VisibleForTesting + Map<Integer, Set<String>> getPartitionIdToSegmentsToDeleteMap( + Map<Integer, LLCSegmentName> partitionToOldestSegment, + Set<String> idealStateSegmentsSet, Map<Integer, LLCSegmentName> partitionIdToLatestSegment) { + + // Find segments to delete (those with higher sequence numbers) + Map<Integer, Set<String>> partitionToSegmentsToDelete = new HashMap<>(); + + for (String segmentName : idealStateSegmentsSet) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + int partitionId = llcSegmentName.getPartitionGroupId(); + + LLCSegmentName oldestSegment = partitionToOldestSegment.get(partitionId); + if (oldestSegment != null && oldestSegment.getSequenceNumber() <= llcSegmentName.getSequenceNumber()) { + partitionToSegmentsToDelete + .computeIfAbsent(partitionId, k -> new HashSet<>()) + .add(segmentName); + } + + // Track latest segment (segment with highest sequence ID) + LLCSegmentName currentLatest = partitionIdToLatestSegment.get(partitionId); + if (currentLatest == null || llcSegmentName.getSequenceNumber() > currentLatest.getSequenceNumber()) { + partitionIdToLatestSegment.put(partitionId, llcSegmentName); + } + } + + return partitionToSegmentsToDelete; + } + + @VisibleForTesting + Map<Integer, LLCSegmentName> getPartitionIDToOldestSegment(List<String> segments, Set<String> idealStateSegmentsSet) { + Map<Integer, LLCSegmentName> partitionToOldestSegment = new HashMap<>(); + + for (String segment : segments) { + LLCSegmentName llcSegmentName = LLCSegmentName.of(segment); + Preconditions.checkState(llcSegmentName != null, "Invalid LLC segment: " + segment); + + // ignore segments that are not present in the ideal state + if (!idealStateSegmentsSet.contains(segment)) { + LOGGER.warn("Segment: {} is not present in the ideal state", segment); + continue; + } + int partitionId = llcSegmentName.getPartitionGroupId(); + + LLCSegmentName currentOldest = partitionToOldestSegment.get(partitionId); + if (currentOldest == null || llcSegmentName.getSequenceNumber() < currentOldest.getSequenceNumber()) { + partitionToOldestSegment.put(partitionId, llcSegmentName); + } + } + + return partitionToOldestSegment; + } + /** * Internal method to update schema * @param tableNameWithType name of the table diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java index 392fc05bd8..a54cb3d884 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java @@ -20,9 +20,14 @@ package org.apache.pinot.controller.api.resources; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.mockito.InjectMocks; import org.mockito.Mock; @@ -30,6 +35,7 @@ import org.mockito.MockitoAnnotations; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -85,4 +91,92 @@ public class PinotSegmentRestletResourceTest { assertTrue(e.getMessage().contains("Only one segment is expected but got: [seg01, seg02]")); } } + + @Test + public void testGetPartitionIdToSegmentsToDeleteMap() { + IdealState idealState = mock(IdealState.class); + ZNRecord znRecord = mock(ZNRecord.class); + String tableName = "testTable"; + long currentTime = System.currentTimeMillis(); + Map<String, Map<String, String>> segmentsToInstanceState = new HashMap<>(); + + // Add segments for partition 0 + for (String segment : getSegmentForPartition(tableName, 0, 0, 10, currentTime)) { + segmentsToInstanceState.put(segment, null); + } + + // Add segments for partition 1 + for (String segment : getSegmentForPartition(tableName, 1, 0, 10, currentTime)) { + segmentsToInstanceState.put(segment, null); + } + + // Mock response for fetching segment to instance state map + when(idealState.getRecord()).thenReturn(znRecord); + when(znRecord.getMapFields()).thenReturn(segmentsToInstanceState); + + // Create the partition to oldest segment map + Map<Integer, LLCSegmentName> partitionToOldestSegment = Map.of( + 0, new LLCSegmentName(tableName, 0, 3, currentTime), + 1, new LLCSegmentName(tableName, 1, 5, currentTime) + ); + + // This map will be populated by the method + Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>(); + + // Create the expected response map + Map<Integer, Set<String>> expectedResponse = new HashMap<>(); + expectedResponse.put(0, + getSegmentForPartition(tableName, 0, 3, 7, currentTime).stream().collect(Collectors.toSet())); + expectedResponse.put(1, + getSegmentForPartition(tableName, 1, 5, 5, currentTime).stream().collect(Collectors.toSet())); + + // Call the method and check the result + Map<Integer, Set<String>> result = _pinotSegmentRestletResource.getPartitionIdToSegmentsToDeleteMap( + partitionToOldestSegment, segmentsToInstanceState.keySet(), partitionIdToLatestSegment); + + assertEquals(expectedResponse, result); + + // Verify that partitionIdToLatestSegment has been populated with the latest segment for each partition + assertEquals(2, partitionIdToLatestSegment.size()); + assertEquals(9, partitionIdToLatestSegment.get(0).getSequenceNumber()); + assertEquals(9, partitionIdToLatestSegment.get(1).getSequenceNumber()); + } + + @Test + public void testGetPartitionIDToOldestSegment() { + List<String> segments = new ArrayList<>(); + String tableName = "testTable"; + long currentTime = System.currentTimeMillis(); + + // Add segments for testing + segments.addAll(getSegmentForPartition(tableName, 0, 3, 3, currentTime)); // Segments with seq 3,4,5 for partition 0 + segments.addAll(getSegmentForPartition(tableName, 1, 4, 2, currentTime)); // Segments with seq 4,5 for partition 1 + + // Only add the above segment to the ideal state segment list + Set<String> idealStateSegmentSet = new HashSet<>(segments); + + // Add a segment from another table to this list that has lower sequence ID for the above partitions + segments.addAll( + getSegmentForPartition(tableName + "fake", 0, 1, 3, currentTime)); // Segments with seq 1,2,3 for partition 0 + + // Create expected result map + Map<Integer, LLCSegmentName> expectedResult = new HashMap<>(); + expectedResult.put(0, new LLCSegmentName(tableName, 0, 3, currentTime)); + expectedResult.put(1, new LLCSegmentName(tableName, 1, 4, currentTime)); + + // Call the method and check the result + Map<Integer, LLCSegmentName> result = + _pinotSegmentRestletResource.getPartitionIDToOldestSegment(segments, idealStateSegmentSet); + + assertEquals(expectedResult, result); + } + + private List<String> getSegmentForPartition(String tableName, int partitionID, int sequenceNumberOffset, + int numberOfSegments, long currentTime) { + List<String> segments = new ArrayList<>(); + for (int i = sequenceNumberOffset; i < sequenceNumberOffset + numberOfSegments; i++) { + segments.add(new LLCSegmentName(tableName, partitionID, i, currentTime).getSegmentName()); + } + return segments; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org