This is an automated email from the ASF dual-hosted git repository. manishswaminathan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 125c103abc Added timeouts to the resource deletion calls from SegmentDeletionManager (#15638) 125c103abc is described below commit 125c103abc1fe7a578920888fdccb1252f00d6ca Author: Ragesh Rajagopalan <ragesh.rajagopa...@gmail.com> AuthorDate: Mon Apr 28 09:00:21 2025 -0700 Added timeouts to the resource deletion calls from SegmentDeletionManager (#15638) --- .../helix/core/SegmentDeletionManager.java | 61 +++++++++++++++++----- .../core/util/SegmentDeletionManagerTest.java | 19 ++++--- 2 files changed, 59 insertions(+), 21 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index 217f93a84d..a87d801152 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -30,10 +30,13 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.helix.AccessOption; @@ -74,6 +77,9 @@ public class SegmentDeletionManager { private static final SimpleDateFormat RETENTION_DATE_FORMAT; private static final String DELIMITER = "/"; + private static final int OBJECT_DELETION_TIMEOUT = 5; + private static final int NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT = 100; + static { RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR); RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getTimeZone("UTC")); @@ -270,8 +276,8 @@ public class SegmentDeletionManager { try { URI segmentMetadataUri = SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(), segmentId); if (pinotFS.exists(segmentMetadataUri)) { - LOGGER.info("Deleting segment metadata {} from {}", segmentId, segmentMetadataUri); - if (!pinotFS.delete(segmentMetadataUri, true)) { + LOGGER.info("Deleting segment metadata: {} from: {}", segmentId, segmentMetadataUri); + if (!deleteWithTimeout(pinotFS, segmentMetadataUri, true, OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) { LOGGER.warn("Could not delete segment metadata: {} from: {}", segmentId, segmentMetadataUri); } } @@ -337,14 +343,10 @@ public class SegmentDeletionManager { } private static void segmentDeletion(String segmentId, PinotFS pinotFS, URI fileToDeleteURI) { - try { - if (pinotFS.delete(fileToDeleteURI, true)) { - LOGGER.info("Deleted segment {} from {}", segmentId, fileToDeleteURI.toString()); - } else { - LOGGER.warn("Failed to delete segment {} from {}", segmentId, fileToDeleteURI.toString()); - } - } catch (IOException e) { - LOGGER.warn("Could not delete segment {} from {}", segmentId, fileToDeleteURI.toString(), e); + if (deleteWithTimeout(pinotFS, fileToDeleteURI, true, OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) { + LOGGER.info("Deleted segment: {} from: {}", segmentId, fileToDeleteURI.toString()); + } else { + LOGGER.warn("Failed to delete segment: {} from: {}", segmentId, fileToDeleteURI.toString()); } } @@ -432,10 +434,15 @@ public class SegmentDeletionManager { URIUtils.getUri(tableNameURI.toString(), URIUtils.encode(URIUtils.getLastPart(targetFile))); long deletionTimeMs = getDeletionTimeMsFromFile(targetURI.toString(), pinotFS.lastModified(targetURI)); if (System.currentTimeMillis() >= deletionTimeMs) { - if (!pinotFS.delete(targetURI, true)) { - LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI); + if (!deleteWithTimeout(pinotFS, targetURI, true, OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) { + LOGGER.warn("Failed to remove resource: {}", targetURI); } else { numFilesDeleted++; + if (numFilesDeleted >= NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT) { + LOGGER.info("Reached threshold of max aged segments to delete per attempt. Deleted: {} files " + + "from directory: {}", numFilesDeleted, tableNameDir); + break; + } } } } catch (Exception e) { @@ -445,8 +452,8 @@ public class SegmentDeletionManager { if (numFilesDeleted == targetFiles.length) { // Delete directory if it's empty - if (!pinotFS.delete(tableNameURI, false)) { - LOGGER.warn("The directory {} cannot be removed.", tableNameDir); + if (!deleteWithTimeout(pinotFS, tableNameURI, false, OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) { + LOGGER.warn("Failed to remove the directory: {}", tableNameDir); } } } @@ -459,6 +466,32 @@ public class SegmentDeletionManager { } } + private static boolean deleteWithTimeout(PinotFS pinotFS, URI targetURI, boolean forceDelete, long timeout, + TimeUnit timeUnit) { + CompletableFuture<Boolean> deleteFuture = CompletableFuture.supplyAsync(() -> { + try { + return pinotFS.delete(targetURI, forceDelete); + } catch (IOException e) { + LOGGER.warn("Error while deleting resource: {}", targetURI, e); + return false; + } + }); + + try { + return deleteFuture.get(timeout, timeUnit); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.warn("Thread was interrupted while deleting resource: {}", targetURI, e); + return false; + } catch (TimeoutException e) { + LOGGER.warn("Timeout occurred while deleting resource: {}", targetURI, e); + return false; + } catch (ExecutionException e) { + LOGGER.warn("Exception occurred while deleting resource: {}", targetURI, e); + return false; + } + } + private String getDeletedSegmentFileName(String fileName, long deletedSegmentsRetentionMs) { return fileName + RETENTION_UNTIL_SEPARATOR + RETENTION_DATE_FORMAT.format(new Date( System.currentTimeMillis() + deletedSegmentsRetentionMs)); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java index c810151c5c..cf1a091409 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java @@ -324,13 +324,17 @@ public class SegmentDeletionManagerTest { PinotFS pinotFS = PinotFSFactory.create("fake"); URI tableUri1 = new URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_1/"); - URI segment1ForTable1 = new URI(tableUri1.getPath() + "segment1" + RETENTION_UNTIL_SEPARATOR + "201901010000"); - URI segment2ForTable1 = new URI(tableUri1.getPath() + "segment2" + RETENTION_UNTIL_SEPARATOR + "210001010000"); pinotFS.mkdir(tableUri1); - pinotFS.mkdir(segment1ForTable1); - pinotFS.mkdir(segment2ForTable1); - // Create dummy files + for (int i = 0; i < 101; i++) { + URI segmentURIForTable = + new URI(tableUri1.getPath() + "segment" + i + RETENTION_UNTIL_SEPARATOR + "201901010000"); + pinotFS.mkdir(segmentURIForTable); + } + // Add a segment that will not be deleted as it won't meet the retention age criteria. + URI segmentURIForTable = new URI(tableUri1.getPath() + "segment2" + RETENTION_UNTIL_SEPARATOR + "210001010000"); + pinotFS.mkdir(segmentURIForTable); + // Create dummy files URI tableUri2 = new URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_2/"); URI segment1ForTable2 = new URI(tableUri2.getPath() + "segment1" + RETENTION_UNTIL_SEPARATOR + "201901010000"); URI segment2ForTable2 = new URI(tableUri2.getPath() + "segment1" + RETENTION_UNTIL_SEPARATOR + "201801010000"); @@ -341,8 +345,9 @@ public class SegmentDeletionManagerTest { deletionManager1.removeAgedDeletedSegments(leadControllerManager); // all files should get deleted Assert.assertFalse(pinotFS.exists(tableUri2)); - // only one file that is beyond retention should exist - Assert.assertEquals(pinotFS.listFiles(tableUri1, false).length, 1); + + // One file that doesn't meet retention criteria, and another file due to the per attempt batch limit remains. + Assert.assertEquals(pinotFS.listFiles(tableUri1, false).length, 2); } @Test --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org