This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 125c103abc Added timeouts to the resource deletion calls from 
SegmentDeletionManager (#15638)
125c103abc is described below

commit 125c103abc1fe7a578920888fdccb1252f00d6ca
Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com>
AuthorDate: Mon Apr 28 09:00:21 2025 -0700

    Added timeouts to the resource deletion calls from SegmentDeletionManager 
(#15638)
---
 .../helix/core/SegmentDeletionManager.java         | 61 +++++++++++++++++-----
 .../core/util/SegmentDeletionManagerTest.java      | 19 ++++---
 2 files changed, 59 insertions(+), 21 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 217f93a84d..a87d801152 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -30,10 +30,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
@@ -74,6 +77,9 @@ public class SegmentDeletionManager {
   private static final SimpleDateFormat RETENTION_DATE_FORMAT;
   private static final String DELIMITER = "/";
 
+  private static final int OBJECT_DELETION_TIMEOUT = 5;
+  private static final int NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT = 100;
+
   static {
     RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
     RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC"));
@@ -270,8 +276,8 @@ public class SegmentDeletionManager {
     try {
       URI segmentMetadataUri = 
SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(), 
segmentId);
       if (pinotFS.exists(segmentMetadataUri)) {
-        LOGGER.info("Deleting segment metadata {} from {}", segmentId, 
segmentMetadataUri);
-        if (!pinotFS.delete(segmentMetadataUri, true)) {
+        LOGGER.info("Deleting segment metadata: {} from: {}", segmentId, 
segmentMetadataUri);
+        if (!deleteWithTimeout(pinotFS, segmentMetadataUri, true, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
           LOGGER.warn("Could not delete segment metadata: {} from: {}", 
segmentId, segmentMetadataUri);
         }
       }
@@ -337,14 +343,10 @@ public class SegmentDeletionManager {
   }
 
   private static void segmentDeletion(String segmentId, PinotFS pinotFS, URI 
fileToDeleteURI) {
-    try {
-      if (pinotFS.delete(fileToDeleteURI, true)) {
-        LOGGER.info("Deleted segment {} from {}", segmentId, 
fileToDeleteURI.toString());
-      } else {
-        LOGGER.warn("Failed to delete segment {} from {}", segmentId, 
fileToDeleteURI.toString());
-      }
-    } catch (IOException e) {
-      LOGGER.warn("Could not delete segment {} from {}", segmentId, 
fileToDeleteURI.toString(), e);
+    if (deleteWithTimeout(pinotFS, fileToDeleteURI, true, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
+      LOGGER.info("Deleted segment: {} from: {}", segmentId, 
fileToDeleteURI.toString());
+    } else {
+      LOGGER.warn("Failed to delete segment: {} from: {}", segmentId, 
fileToDeleteURI.toString());
     }
   }
 
@@ -432,10 +434,15 @@ public class SegmentDeletionManager {
                     URIUtils.getUri(tableNameURI.toString(), 
URIUtils.encode(URIUtils.getLastPart(targetFile)));
                 long deletionTimeMs = 
getDeletionTimeMsFromFile(targetURI.toString(), 
pinotFS.lastModified(targetURI));
                 if (System.currentTimeMillis() >= deletionTimeMs) {
-                  if (!pinotFS.delete(targetURI, true)) {
-                    LOGGER.warn("Cannot remove file {} from deleted 
directory.", targetURI);
+                  if (!deleteWithTimeout(pinotFS, targetURI, true, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
+                    LOGGER.warn("Failed to remove resource: {}", targetURI);
                   } else {
                     numFilesDeleted++;
+                    if (numFilesDeleted >= 
NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT) {
+                      LOGGER.info("Reached threshold of max aged segments to 
delete per attempt. Deleted: {} files "
+                          + "from directory: {}", numFilesDeleted, 
tableNameDir);
+                      break;
+                    }
                   }
                 }
               } catch (Exception e) {
@@ -445,8 +452,8 @@ public class SegmentDeletionManager {
 
             if (numFilesDeleted == targetFiles.length) {
               // Delete directory if it's empty
-              if (!pinotFS.delete(tableNameURI, false)) {
-                LOGGER.warn("The directory {} cannot be removed.", 
tableNameDir);
+              if (!deleteWithTimeout(pinotFS, tableNameURI, false, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
+                LOGGER.warn("Failed to remove the directory: {}", 
tableNameDir);
               }
             }
           }
@@ -459,6 +466,32 @@ public class SegmentDeletionManager {
     }
   }
 
+  private static boolean deleteWithTimeout(PinotFS pinotFS, URI targetURI, 
boolean forceDelete, long timeout,
+      TimeUnit timeUnit) {
+    CompletableFuture<Boolean> deleteFuture = CompletableFuture.supplyAsync(() 
-> {
+      try {
+        return pinotFS.delete(targetURI, forceDelete);
+      } catch (IOException e) {
+        LOGGER.warn("Error while deleting resource: {}", targetURI, e);
+        return false;
+      }
+    });
+
+    try {
+      return deleteFuture.get(timeout, timeUnit);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Thread was interrupted while deleting resource: {}", 
targetURI, e);
+      return false;
+    } catch (TimeoutException e) {
+      LOGGER.warn("Timeout occurred while deleting resource: {}", targetURI, 
e);
+      return false;
+    } catch (ExecutionException e) {
+      LOGGER.warn("Exception occurred while deleting resource: {}", targetURI, 
e);
+      return false;
+    }
+  }
+
   private String getDeletedSegmentFileName(String fileName, long 
deletedSegmentsRetentionMs) {
     return fileName + RETENTION_UNTIL_SEPARATOR + 
RETENTION_DATE_FORMAT.format(new Date(
         System.currentTimeMillis() + deletedSegmentsRetentionMs));
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index c810151c5c..cf1a091409 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -324,13 +324,17 @@ public class SegmentDeletionManagerTest {
     PinotFS pinotFS = PinotFSFactory.create("fake");
 
     URI tableUri1 = new 
URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_1/");
-    URI segment1ForTable1 = new URI(tableUri1.getPath() + "segment1" + 
RETENTION_UNTIL_SEPARATOR + "201901010000");
-    URI segment2ForTable1 = new URI(tableUri1.getPath() + "segment2" + 
RETENTION_UNTIL_SEPARATOR + "210001010000");
     pinotFS.mkdir(tableUri1);
-    pinotFS.mkdir(segment1ForTable1);
-    pinotFS.mkdir(segment2ForTable1);
-    // Create dummy files
+    for (int i = 0; i < 101; i++) {
+      URI segmentURIForTable =
+          new URI(tableUri1.getPath() + "segment" + i + 
RETENTION_UNTIL_SEPARATOR + "201901010000");
+      pinotFS.mkdir(segmentURIForTable);
+    }
+    // Add a segment that will not be deleted as it won't meet the retention 
age criteria.
+    URI segmentURIForTable = new URI(tableUri1.getPath() + "segment2" + 
RETENTION_UNTIL_SEPARATOR + "210001010000");
+    pinotFS.mkdir(segmentURIForTable);
 
+    // Create dummy files
     URI tableUri2 = new 
URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_2/");
     URI segment1ForTable2 = new URI(tableUri2.getPath() + "segment1" + 
RETENTION_UNTIL_SEPARATOR + "201901010000");
     URI segment2ForTable2 = new URI(tableUri2.getPath() + "segment1" + 
RETENTION_UNTIL_SEPARATOR + "201801010000");
@@ -341,8 +345,9 @@ public class SegmentDeletionManagerTest {
     deletionManager1.removeAgedDeletedSegments(leadControllerManager);
     // all files should get deleted
     Assert.assertFalse(pinotFS.exists(tableUri2));
-    // only one file that is beyond retention should exist
-    Assert.assertEquals(pinotFS.listFiles(tableUri1, false).length, 1);
+
+    // One file that doesn't meet retention criteria, and another file due to 
the per attempt batch limit remains.
+    Assert.assertEquals(pinotFS.listFiles(tableUri1, false).length, 2);
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to