This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new eacd6c058a Allows segments deletion in build for pauseless tables 
(#15299)
eacd6c058a is described below

commit eacd6c058a4795541cd1759793734f8b5590e372
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Thu Mar 27 19:43:32 2025 +0530

    Allows segments deletion in build for pauseless tables (#15299)
---
 .../api/resources/PinotSegmentRestletResource.java | 145 ++++++++++++++++++++-
 .../resources/PinotSegmentRestletResourceTest.java |  94 +++++++++++++
 2 files changed, 238 insertions(+), 1 deletion(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 600c75b718..a317391766 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -37,6 +37,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,6 +65,7 @@ import javax.ws.rs.core.Response.Status;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.exception.InvalidConfigException;
@@ -75,6 +77,7 @@ import 
org.apache.pinot.common.restlet.resources.ServerSegmentsReloadCheckRespon
 import 
org.apache.pinot.common.restlet.resources.TableSegmentsReloadCheckResponse;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.common.utils.UploadedRealtimeSegmentName;
 import org.apache.pinot.controller.ControllerConf;
@@ -891,7 +894,8 @@ public class PinotSegmentRestletResource {
     return new SuccessResponse("Deleted " + numSegments + " segments from 
table: " + tableName);
   }
 
-  private void deleteSegmentsInternal(String tableNameWithType, List<String> 
segments, String retentionPeriod) {
+  private void deleteSegmentsInternal(String tableNameWithType, List<String> 
segments,
+      @Nullable String retentionPeriod) {
     PinotResourceManagerResponse response = 
_pinotHelixResourceManager.deleteSegments(tableNameWithType, segments,
         retentionPeriod);
     if (!response.isSuccessful()) {
@@ -1175,6 +1179,145 @@ public class PinotSegmentRestletResource {
     return updateZKTimeIntervalInternal(tableNameWithType);
   }
 
+  @DELETE
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/deletePauselessSegments/{tableName}")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.DELETE_SEGMENT)
+  @Authenticate(AccessType.DELETE)
+  @ApiOperation(value = "Delete segments from a pauseless enabled table", 
notes =
+      "Deletes segments from a pauseless-enabled table based on the provided 
segment names. "
+          + "For each segment provided, it identifies the partition and 
deletes all segments "
+          + "with sequence numbers >= the provided segment in that partition. "
+          + "When force flag is true, it bypasses checks for pauseless being 
enabled and table being paused. "
+          + "The retention period controls how long deleted segments are 
retained before permanent removal. "
+          + "It follows this precedence: input parameter → table config → 
cluster setting → 7d default. "
+          + "Use 0d or -1d for immediate deletion without retention.")
+  public SuccessResponse deletePauselessSegments(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType")
+      String tableNameWithType,
+      @ApiParam(value = "List of segment names. For each segment, all segments 
with higher sequence IDs in the same "
+          + "partition will be deleted", required = true, allowMultiple = true)
+      @QueryParam("segments") List<String> segments,
+      @ApiParam(value = "Force flag to bypass checks for pauseless being 
enabled and table being paused",
+          defaultValue = "false") @QueryParam("force") boolean force,
+      @Context HttpHeaders headers
+  ) {
+
+    tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
+
+    
Preconditions.checkState(TableNameBuilder.isRealtimeTableResource(tableNameWithType),
+        "Table should be a realtime table.");
+
+    // Validate input segments
+    if (segments == null || segments.isEmpty()) {
+      throw new ControllerApplicationException(LOGGER, "Segment list must not 
be empty", Status.BAD_REQUEST);
+    }
+
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+
+    if (!force) {
+      // Check if pauseless is enabled
+      
Preconditions.checkState(PauselessConsumptionUtils.isPauselessEnabled(tableConfig),
+          "Pauseless is not enabled for the table " + tableNameWithType);
+      // Check if the ingestion has been paused
+      
Preconditions.checkState(_pinotHelixResourceManager.getRealtimeSegmentManager()
+          .getPauseStatusDetails(tableNameWithType)
+          .getPauseFlag(), "Table " + tableNameWithType + " should be paused 
before deleting segments.");
+    }
+
+    IdealState idealState = 
_pinotHelixResourceManager.getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Ideal State does not exist 
for table " + tableNameWithType);
+
+    Set<String> idealStateSegmentsSet = 
idealState.getRecord().getMapFields().keySet();
+    Map<Integer, LLCSegmentName> partitionToOldestSegment =
+        getPartitionIDToOldestSegment(segments, idealStateSegmentsSet);
+    Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>();
+    Map<Integer, Set<String>> partitionIdToSegmentsToDeleteMap =
+        getPartitionIdToSegmentsToDeleteMap(partitionToOldestSegment, 
idealStateSegmentsSet,
+            partitionIdToLatestSegment);
+    for (Integer partitionID : partitionToOldestSegment.keySet()) {
+      Set<String> segmentToDeleteForPartition = 
partitionIdToSegmentsToDeleteMap.get(partitionID);
+      LOGGER.info("Deleting : {} segments from segment: {} to segment: {} for 
partition: {}",
+          segmentToDeleteForPartition.size(), 
partitionToOldestSegment.get(partitionID),
+          partitionIdToLatestSegment.get(partitionID), partitionID);
+      deleteSegmentsInternal(tableNameWithType, new 
ArrayList<>(segmentToDeleteForPartition), null);
+    }
+
+    return new SuccessResponse("Successfully deleted segments for table: " + 
tableNameWithType);
+  }
+
+  /**
+   * Identifies segments that need to be deleted based on partition and 
sequence ID information.
+   *
+   * For each partition in the provided partitionToOldestSegment map, this 
method identifies
+   * all segments with sequence IDs greater than or equal to the oldest 
segment's sequence ID.
+   * It also tracks the latest segment (highest sequence ID) for each 
partition, which is useful
+   * for logging purposes.
+   *
+   * @param partitionToOldestSegment Map of partition IDs to their 
corresponding oldest segment (lowest sequence ID)
+   *                                that serves as the threshold for deletion. 
All segments with sequence IDs
+   *                                greater than or equal to this will be 
selected for deletion.
+   * @param idealStateSegmentsSet The segments present in the ideal state for 
the table
+   * @param partitionIdToLatestSegment A map that will be populated with the 
latest segment (highest sequence ID)
+   *                                  for each partition. This is passed by 
reference and modified by this method.
+   *
+   * @return A map from partition IDs to sets of segment names that should be 
deleted.
+   *         Each set contains all segments with sequence IDs >= the oldest 
segment's sequence ID
+   *         for that particular partition.
+   */
+  @VisibleForTesting
+  Map<Integer, Set<String>> getPartitionIdToSegmentsToDeleteMap(
+      Map<Integer, LLCSegmentName> partitionToOldestSegment,
+      Set<String> idealStateSegmentsSet, Map<Integer, LLCSegmentName> 
partitionIdToLatestSegment) {
+
+    // Find segments to delete (those with higher sequence numbers)
+    Map<Integer, Set<String>> partitionToSegmentsToDelete = new HashMap<>();
+
+    for (String segmentName : idealStateSegmentsSet) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+      int partitionId = llcSegmentName.getPartitionGroupId();
+
+      LLCSegmentName oldestSegment = partitionToOldestSegment.get(partitionId);
+      if (oldestSegment != null && oldestSegment.getSequenceNumber() <= 
llcSegmentName.getSequenceNumber()) {
+        partitionToSegmentsToDelete
+            .computeIfAbsent(partitionId, k -> new HashSet<>())
+            .add(segmentName);
+      }
+
+      // Track latest segment (segment with highest sequence ID)
+      LLCSegmentName currentLatest = 
partitionIdToLatestSegment.get(partitionId);
+      if (currentLatest == null || llcSegmentName.getSequenceNumber() > 
currentLatest.getSequenceNumber()) {
+        partitionIdToLatestSegment.put(partitionId, llcSegmentName);
+      }
+    }
+
+    return partitionToSegmentsToDelete;
+  }
+
+  @VisibleForTesting
+  Map<Integer, LLCSegmentName> getPartitionIDToOldestSegment(List<String> 
segments, Set<String> idealStateSegmentsSet) {
+    Map<Integer, LLCSegmentName> partitionToOldestSegment = new HashMap<>();
+
+    for (String segment : segments) {
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
+      Preconditions.checkState(llcSegmentName != null, "Invalid LLC segment: " 
+ segment);
+
+      // ignore segments that are not present in the ideal state
+      if (!idealStateSegmentsSet.contains(segment)) {
+        LOGGER.warn("Segment: {} is not present in the ideal state", segment);
+        continue;
+      }
+      int partitionId = llcSegmentName.getPartitionGroupId();
+
+      LLCSegmentName currentOldest = partitionToOldestSegment.get(partitionId);
+      if (currentOldest == null || llcSegmentName.getSequenceNumber() < 
currentOldest.getSequenceNumber()) {
+        partitionToOldestSegment.put(partitionId, llcSegmentName);
+      }
+    }
+
+    return partitionToOldestSegment;
+  }
+
   /**
    * Internal method to update schema
    * @param tableNameWithType  name of the table
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
index 392fc05bd8..a54cb3d884 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
@@ -20,9 +20,14 @@ package org.apache.pinot.controller.api.resources;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.mockito.InjectMocks;
 import org.mockito.Mock;
@@ -30,6 +35,7 @@ import org.mockito.MockitoAnnotations;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -85,4 +91,92 @@ public class PinotSegmentRestletResourceTest {
       assertTrue(e.getMessage().contains("Only one segment is expected but 
got: [seg01, seg02]"));
     }
   }
+
+  @Test
+  public void testGetPartitionIdToSegmentsToDeleteMap() {
+    IdealState idealState = mock(IdealState.class);
+    ZNRecord znRecord = mock(ZNRecord.class);
+    String tableName = "testTable";
+    long currentTime = System.currentTimeMillis();
+    Map<String, Map<String, String>> segmentsToInstanceState = new HashMap<>();
+
+    // Add segments for partition 0
+    for (String segment : getSegmentForPartition(tableName, 0, 0, 10, 
currentTime)) {
+      segmentsToInstanceState.put(segment, null);
+    }
+
+    // Add segments for partition 1
+    for (String segment : getSegmentForPartition(tableName, 1, 0, 10, 
currentTime)) {
+      segmentsToInstanceState.put(segment, null);
+    }
+
+    // Mock response for fetching segment to instance state map
+    when(idealState.getRecord()).thenReturn(znRecord);
+    when(znRecord.getMapFields()).thenReturn(segmentsToInstanceState);
+
+    // Create the partition to oldest segment map
+    Map<Integer, LLCSegmentName> partitionToOldestSegment = Map.of(
+        0, new LLCSegmentName(tableName, 0, 3, currentTime),
+        1, new LLCSegmentName(tableName, 1, 5, currentTime)
+    );
+
+    // This map will be populated by the method
+    Map<Integer, LLCSegmentName> partitionIdToLatestSegment = new HashMap<>();
+
+    // Create the expected response map
+    Map<Integer, Set<String>> expectedResponse = new HashMap<>();
+    expectedResponse.put(0,
+        getSegmentForPartition(tableName, 0, 3, 7, 
currentTime).stream().collect(Collectors.toSet()));
+    expectedResponse.put(1,
+        getSegmentForPartition(tableName, 1, 5, 5, 
currentTime).stream().collect(Collectors.toSet()));
+
+    // Call the method and check the result
+    Map<Integer, Set<String>> result = 
_pinotSegmentRestletResource.getPartitionIdToSegmentsToDeleteMap(
+        partitionToOldestSegment, segmentsToInstanceState.keySet(), 
partitionIdToLatestSegment);
+
+    assertEquals(expectedResponse, result);
+
+    // Verify that partitionIdToLatestSegment has been populated with the 
latest segment for each partition
+    assertEquals(2, partitionIdToLatestSegment.size());
+    assertEquals(9, partitionIdToLatestSegment.get(0).getSequenceNumber());
+    assertEquals(9, partitionIdToLatestSegment.get(1).getSequenceNumber());
+  }
+
+  @Test
+  public void testGetPartitionIDToOldestSegment() {
+    List<String> segments = new ArrayList<>();
+    String tableName = "testTable";
+    long currentTime = System.currentTimeMillis();
+
+    // Add segments for testing
+    segments.addAll(getSegmentForPartition(tableName, 0, 3, 3, currentTime)); 
// Segments with seq 3,4,5 for partition 0
+    segments.addAll(getSegmentForPartition(tableName, 1, 4, 2, currentTime)); 
// Segments with seq 4,5 for partition 1
+
+    // Only add the above segment to the ideal state segment list
+    Set<String> idealStateSegmentSet = new HashSet<>(segments);
+
+    // Add a segment from another table to this list that has lower sequence 
ID for the above partitions
+    segments.addAll(
+        getSegmentForPartition(tableName + "fake", 0, 1, 3, currentTime)); // 
Segments with seq 1,2,3 for partition 0
+
+    // Create expected result map
+    Map<Integer, LLCSegmentName> expectedResult = new HashMap<>();
+    expectedResult.put(0, new LLCSegmentName(tableName, 0, 3, currentTime));
+    expectedResult.put(1, new LLCSegmentName(tableName, 1, 4, currentTime));
+
+    // Call the method and check the result
+    Map<Integer, LLCSegmentName> result =
+        _pinotSegmentRestletResource.getPartitionIDToOldestSegment(segments, 
idealStateSegmentSet);
+
+    assertEquals(expectedResult, result);
+  }
+
+  private List<String> getSegmentForPartition(String tableName, int 
partitionID, int sequenceNumberOffset,
+      int numberOfSegments, long currentTime) {
+    List<String> segments = new ArrayList<>();
+    for (int i = sequenceNumberOffset; i < sequenceNumberOffset + 
numberOfSegments; i++) {
+      segments.add(new LLCSegmentName(tableName, partitionID, i, 
currentTime).getSegmentName());
+    }
+    return segments;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to