This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f123000ca3 Batch delete deleted directory segments (#16848)
7f123000ca3 is described below

commit 7f123000ca36a95fdd5a2203d3f45473a19cf9f3
Author: 9aman <[email protected]>
AuthorDate: Fri Sep 26 15:56:32 2025 +0530

    Batch delete deleted directory segments (#16848)
    
    * Batch delete deleted directory segments
    
    * Changes to ensure async deletion
    
    * Fixing unit tests
    
    * Improve and fix test cases
    
    * Minor improvements in testing
    
    * Renaming variable to correctly capture the logic
    
    * Add support for providing a custom batch size for aged deleted segments 
deletion
    
    * Fixing the harcoded string to a constant
    
    * Fixing failes test cases
    
    * Increase the default deletion size to 1000 from 100
    
    * Add timeout for batched deletion.
---
 .../apache/pinot/controller/ControllerConf.java    |  12 ++
 .../helix/core/SegmentDeletionManager.java         | 196 ++++++++++-----------
 .../helix/core/retention/RetentionManager.java     |   5 +-
 .../helix/core/retention/RetentionManagerTest.java |  20 ++-
 .../core/util/SegmentDeletionManagerTest.java      | 116 +++++++++---
 5 files changed, 213 insertions(+), 136 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 8e0583ddee1..7a30e12e4e2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -274,6 +274,9 @@ public class ControllerConf extends PinotConfiguration {
     public static final String UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS =
         "controller.retentionManager.untrackedSegmentsRetentionTimeInDays";
     public static final int DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS 
= 3;
+    public static final String AGED_SEGMENTS_DELETION_BATCH_SIZE =
+        "controller.retentionManager.agedSegmentsDeletionBatchSize";
+    public static final int DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE = 1000;
     public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
     public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
     public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 
60 * 60; // 1 Hour.
@@ -1237,6 +1240,15 @@ public class ControllerConf extends PinotConfiguration {
     setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, 
untrackedSegmentDeletionEnabled);
   }
 
+  public int getAgedSegmentsDeletionBatchSize() {
+    return 
getProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE,
+        ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
+  }
+
+  public void setAgedSegmentsDeletionBatchSize(int 
agedSegmentsDeletionBatchSize) {
+    setProperty(ControllerPeriodicTasksConf.AGED_SEGMENTS_DELETION_BATCH_SIZE, 
agedSegmentsDeletionBatchSize);
+  }
+
   public long getPinotTaskManagerInitialDelaySeconds() {
     return getPeriodicTaskInitialDelayInSeconds();
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index 5638ebc16ce..e72ac521cec 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -54,6 +54,7 @@ import 
org.apache.pinot.core.segment.processing.lifecycle.impl.SegmentDeletionEv
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.TimeUtils;
@@ -71,14 +72,15 @@ public class SegmentDeletionManager {
   // Retention date format will be written as suffix to deleted segments under 
`Deleted_Segments` folder. for example:
   // 
`Deleted_Segments/myTable/myTable_mySegment_0__RETENTION_UNTIL__202202021200` 
to indicate that this segment
   // file will be permanently deleted after Feb 2nd 2022 12PM.
-  private static final String DELETED_SEGMENTS = "Deleted_Segments";
+  public static final String DELETED_SEGMENTS = "Deleted_Segments";
   private static final String RETENTION_UNTIL_SEPARATOR = 
"__RETENTION_UNTIL__";
   private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMddHHmm";
   private static final SimpleDateFormat RETENTION_DATE_FORMAT;
   private static final String DELIMITER = "/";
 
+  public static final int NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT = 1000;
   private static final int OBJECT_DELETION_TIMEOUT = 5;
-  private static final int NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT = 100;
+  private static final int MAX_BATCH_DELETION_TIMEOUT_SECONDS = 600; // 10 
minutes
 
   static {
     RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR);
@@ -210,17 +212,6 @@ public class SegmentDeletionManager {
     }
   }
 
-  public void removeSegmentsFromStore(String tableNameWithType, List<String> 
segments) {
-    removeSegmentsFromStore(tableNameWithType, segments, null);
-  }
-
-  public void removeSegmentsFromStore(String tableNameWithType, List<String> 
segments,
-      @Nullable Long deletedSegmentsRetentionMs) {
-    for (String segment : segments) {
-      removeSegmentFromStore(tableNameWithType, segment, 
deletedSegmentsRetentionMs);
-    }
-  }
-
   public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
List<String> segments,
       @Nullable Long deletedSegmentsRetentionMs) {
     if (_dataDir == null) {
@@ -270,49 +261,6 @@ public class SegmentDeletionManager {
     }
   }
 
-  private void deleteSegmentMetadataFromStore(PinotFS pinotFS, URI 
segmentFileUri, String segmentId) {
-    // Check if segment metadata exists in remote store and delete it.
-    // URI is generated from segment's location and segment name
-    try {
-      URI segmentMetadataUri = 
SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(), 
segmentId);
-      if (pinotFS.exists(segmentMetadataUri)) {
-        LOGGER.info("Deleting segment metadata: {} from: {}", segmentId, 
segmentMetadataUri);
-        if (!deleteWithTimeout(pinotFS, segmentMetadataUri, true, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
-          LOGGER.warn("Could not delete segment metadata: {} from: {}", 
segmentId, segmentMetadataUri);
-        }
-      }
-    } catch (IOException e) {
-      LOGGER.warn("Could not delete segment metadata {} from {}", segmentId, 
segmentFileUri, e);
-    } catch (URISyntaxException e) {
-      LOGGER.warn("Could not parse segment uri {}", segmentFileUri, e);
-    }
-  }
-
-  protected void removeSegmentFromStore(String tableNameWithType, String 
segmentId,
-      @Nullable Long deletedSegmentsRetentionMs) {
-    if (_dataDir != null) {
-      long retentionMs = deletedSegmentsRetentionMs == null
-          ? _defaultDeletedSegmentsRetentionMs : deletedSegmentsRetentionMs;
-      String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-      URI fileToDeleteURI = getFileToDeleteURI(rawTableName, segmentId);
-      if (fileToDeleteURI == null) {
-        return;
-      }
-      PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
-      // Segment metadata in remote store is an optimization, to avoid 
downloading segment to parse metadata.
-      // This is catch all clean up to ensure that metadata is removed from 
deep store.
-      deleteSegmentMetadataFromStore(pinotFS, fileToDeleteURI, segmentId);
-      if (retentionMs <= 0) {
-        // delete the segment file instantly if retention is set to zero
-        segmentDeletion(segmentId, pinotFS, fileToDeleteURI);
-      } else {
-        moveSegmentsToDeletedDir(segmentId, deletedSegmentsRetentionMs, 
rawTableName, pinotFS, fileToDeleteURI);
-      }
-    } else {
-      LOGGER.info("dataDir is not configured, won't delete segment {} from 
disk", segmentId);
-    }
-  }
-
   private void moveSegmentsToDeletedDir(String segmentId, Long 
deletedSegmentsRetentionMs, String rawTableName,
       PinotFS pinotFS,
       URI fileToDeleteURI) {
@@ -342,13 +290,6 @@ public class SegmentDeletionManager {
     }
   }
 
-  private static void segmentDeletion(String segmentId, PinotFS pinotFS, URI 
fileToDeleteURI) {
-    if (deleteWithTimeout(pinotFS, fileToDeleteURI, true, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
-      LOGGER.info("Deleted segment: {} from: {}", segmentId, 
fileToDeleteURI.toString());
-    } else {
-      LOGGER.warn("Failed to delete segment: {} from: {}", segmentId, 
fileToDeleteURI.toString());
-    }
-  }
 
   /**
    * Retrieves the URI for segment deletion by checking two possible segment 
file variants in deep store.
@@ -386,11 +327,26 @@ public class SegmentDeletionManager {
       return null;
     }
   }
-
   /**
-   * Removes aged deleted segments from the deleted directory
+   * Removes aged deleted segments from the deleted directory using the 
default batch size.
+   * This method processes aged segments in batches to avoid overwhelming the 
system.
+   *
+   * @param leadControllerManager the lead controller manager to check if this 
controller is the leader for tables
    */
   public void removeAgedDeletedSegments(LeadControllerManager 
leadControllerManager) {
+    removeAgedDeletedSegments(leadControllerManager, 
NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT);
+  }
+
+  /**
+   * Removes aged deleted segments from the deleted directory with a custom 
batch size.
+   * This method asynchronously deletes segments that have exceeded their 
retention period.
+   * Only the leader controller for each table will perform the deletion to 
avoid conflicts.
+   *
+   * @param leadControllerManager the lead controller manager to check if this 
controller is the leader for tables
+   * @param agedSegmentsDeletionBatchSize the maximum number of aged segments 
to process in a single batch
+   */
+  public void removeAgedDeletedSegments(LeadControllerManager 
leadControllerManager,
+      int agedSegmentsDeletionBatchSize) {
     if (_dataDir != null) {
       URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
       PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
@@ -425,37 +381,74 @@ public class SegmentDeletionManager {
           if (leadControllerManager.isLeaderForTable(tableName)) {
             URI tableNameURI = URIUtils.getUri(deletedDirURI.toString(), 
URIUtils.encode(tableName));
             // Get files that are aged
-            final String[] targetFiles = pinotFS.listFiles(tableNameURI, 
false);
-            int numFilesDeleted = 0;
-            URI targetURI = null;
-            for (String targetFile : targetFiles) {
+            final List<FileMetadata> targetFiles = 
pinotFS.listFilesWithMetadata(tableNameURI, false);
+
+            if (targetFiles.isEmpty()) {
+              LOGGER.info("Deleting empty deleted segments directory: {} for 
table: {}", tableNameURI, tableName);
               try {
-                targetURI =
-                    URIUtils.getUri(tableNameURI.toString(), 
URIUtils.encode(URIUtils.getLastPart(targetFile)));
-                long deletionTimeMs = 
getDeletionTimeMsFromFile(targetURI.toString(), 
pinotFS.lastModified(targetURI));
-                if (System.currentTimeMillis() >= deletionTimeMs) {
-                  if (!deleteWithTimeout(pinotFS, targetURI, true, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
-                    LOGGER.warn("Failed to remove resource: {}", targetURI);
-                  } else {
-                    numFilesDeleted++;
-                    if (numFilesDeleted >= 
NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT) {
-                      LOGGER.info("Reached threshold of max aged segments to 
delete per attempt. Deleted: {} files "
-                          + "from directory: {}", numFilesDeleted, 
tableNameDir);
-                      break;
-                    }
-                  }
+                if (!pinotFS.delete(tableNameURI, false)) {
+                  LOGGER.info("Could not delete deleted segments directory: {} 
for table: {}", tableNameURI, tableName);
+                } else {
+                  LOGGER.info("Successfully deleted deleted segments 
directory: {} for table: {}", tableNameURI,
+                      tableName);
                 }
               } catch (Exception e) {
-                LOGGER.warn("Failed to delete uri: {} for table: {}", 
targetURI, tableName, e);
+                LOGGER.error("Exception occurred while deleting deleted 
segments directory: {} for table: {}",
+                    tableNameURI, tableName, e);
               }
+              continue;
             }
+            int numFilesScheduledForDeletion = 0;
+            URI targetURI = null;
+            List<URI> targetURIs = new ArrayList<>();
+            for (FileMetadata targetFile : targetFiles) {
+              // Some file system implementations also return the current 
table directory
+              // we do not want to delete the table directory
+              if (targetFile.isDirectory()) {
+                continue;
+              }
 
-            if (numFilesDeleted == targetFiles.length) {
-              // Delete directory if it's empty
-              if (!deleteWithTimeout(pinotFS, tableNameURI, false, 
OBJECT_DELETION_TIMEOUT, TimeUnit.SECONDS)) {
-                LOGGER.warn("Failed to remove the directory: {}", 
tableNameDir);
+              targetURI =
+                  URIUtils.getUri(tableNameURI.toString(),
+                      
URIUtils.encode(URIUtils.getLastPart(targetFile.getFilePath())));
+              long deletionTimeMs = 
getDeletionTimeMsFromFile(targetURI.toString(), 
targetFile.getLastModifiedTime());
+              if (System.currentTimeMillis() >= deletionTimeMs) {
+                numFilesScheduledForDeletion++;
+                targetURIs.add(targetURI);
+                if (numFilesScheduledForDeletion == 
agedSegmentsDeletionBatchSize) {
+                  LOGGER.info(
+                      "Reached threshold of max aged segments to schedule for 
deletion per attempt. Scheduling "
+                          + "deletion of: {} segment files "
+                          + "from directory: {}", 
numFilesScheduledForDeletion, tableNameDir);
+                  break;
+                }
               }
             }
+            try {
+              if (numFilesScheduledForDeletion > 0) {
+                LOGGER.info("Submitting request to delete: {} segment files 
from directory: {}",
+                    numFilesScheduledForDeletion, tableNameDir);
+                _executorService.submit(() -> {
+                  try {
+                    long timeoutSeconds = 
Math.min(MAX_BATCH_DELETION_TIMEOUT_SECONDS,
+                        (long) OBJECT_DELETION_TIMEOUT * targetURIs.size());
+                    if (!deleteBatchWithTimeout(pinotFS, targetURIs, false, 
timeoutSeconds, TimeUnit.SECONDS)) {
+                      LOGGER.warn("Failed to delete aged segment files from 
table: {}", tableName);
+                    } else {
+                      LOGGER.info("Successfully deleted {} aged segment files 
from table: {}",
+                          targetURIs.size(),
+                          tableName);
+                    }
+                  } catch (Exception e) {
+                    LOGGER.error("Exception occurred while deleting aged 
segments for table: {} from path: {}",
+                        tableName, tableNameURI, e);
+                  }
+                });
+              }
+            } catch (Exception e) {
+              LOGGER.warn("Failed to submit deletion task for segments in 
table: {} from path: {}", tableName,
+                  tableNameURI, e);
+            }
           }
         }
       } catch (IOException e) {
@@ -466,32 +459,33 @@ public class SegmentDeletionManager {
     }
   }
 
-  private static boolean deleteWithTimeout(PinotFS pinotFS, URI targetURI, 
boolean forceDelete, long timeout,
-      TimeUnit timeUnit) {
+  private static boolean deleteBatchWithTimeout(PinotFS pinotFS, List<URI> 
targetURIs, boolean forceDelete,
+      long timeout, TimeUnit timeUnit) {
     CompletableFuture<Boolean> deleteFuture = CompletableFuture.supplyAsync(() 
-> {
-      try {
-        return pinotFS.delete(targetURI, forceDelete);
-      } catch (IOException e) {
-        LOGGER.warn("Error while deleting resource: {}", targetURI, e);
-        return false;
-      }
-    });
-
+          try {
+            return pinotFS.deleteBatch(targetURIs, forceDelete);
+          } catch (Exception e) {
+            LOGGER.warn("Failed to batch delete segments files", e);
+            return false;
+          }
+        }
+    );
     try {
       return deleteFuture.get(timeout, timeUnit);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      LOGGER.warn("Thread was interrupted while deleting resource: {}", 
targetURI, e);
+      LOGGER.warn("Thread was interrupted while deleting resources", e);
       return false;
     } catch (TimeoutException e) {
-      LOGGER.warn("Timeout occurred while deleting resource: {}", targetURI, 
e);
+      LOGGER.warn("Timeout occurred while deleting resource", e);
       return false;
     } catch (ExecutionException e) {
-      LOGGER.warn("Exception occurred while deleting resource: {}", targetURI, 
e);
+      LOGGER.warn("Exception occurred while deleting resource", e);
       return false;
     }
   }
 
+
   private String getDeletedSegmentFileName(String fileName, long 
deletedSegmentsRetentionMs) {
     return fileName + RETENTION_UNTIL_SEPARATOR + 
RETENTION_DATE_FORMAT.format(new Date(
         System.currentTimeMillis() + deletedSegmentsRetentionMs));
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9dd9fe11c73..3d1bd43e418 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -81,6 +81,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
   private final boolean _untrackedSegmentDeletionEnabled;
   private final int _untrackedSegmentsRetentionTimeInDays;
+  private final int _agedSegmentsDeletionBatchSize;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RetentionManager.class);
   private final boolean _isHybridTableRetentionStrategyEnabled;
@@ -94,6 +95,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
         controllerMetrics);
     _untrackedSegmentDeletionEnabled = 
config.getUntrackedSegmentDeletionEnabled();
     _untrackedSegmentsRetentionTimeInDays = 
config.getUntrackedSegmentsRetentionTimeInDays();
+    _agedSegmentsDeletionBatchSize = config.getAgedSegmentsDeletionBatchSize();
     _isHybridTableRetentionStrategyEnabled = 
config.isHybridTableRetentionStrategyEnabled();
     _brokerServiceHelper = brokerServiceHelper;
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", 
getIntervalInSeconds());
@@ -120,7 +122,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   @Override
   protected void postprocess() {
     LOGGER.info("Removing aged deleted segments for all tables");
-    
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_leadControllerManager);
+    _pinotHelixResourceManager.getSegmentDeletionManager()
+        .removeAgedDeletedSegments(_leadControllerManager, 
_agedSegmentsDeletionBatchSize);
   }
 
   private void manageRetentionForTable(TableConfig tableConfig) {
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index d5a1f29849d..ebe829231a0 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -210,7 +210,8 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager 
is called
-    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager);
+    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager,
+        
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
 
     // Verify deleteSegments is called
     verify(pinotHelixResourceManager, 
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
@@ -410,7 +411,8 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager 
is called
-    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager);
+    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager,
+        
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
 
     // Verify deleteSegments is called
     verify(pinotHelixResourceManager, 
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
@@ -523,7 +525,9 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
mock(SegmentDeletionManager.class);
     // Ignore the call to SegmentDeletionManager.removeAgedDeletedSegments. we 
only test that the call is made once per
     // run of the retention manager
-    doAnswer(invocationOnMock -> 
null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
+    doAnswer(invocationOnMock -> null).when(deletionManager)
+        .removeAgedDeletedSegments(leadControllerManager,
+            
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
     
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
 
     // If and when PinotHelixResourceManager.deleteSegments() is invoked, make 
sure that the segments deleted
@@ -552,7 +556,9 @@ public class RetentionManagerTest {
     when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
 
     SegmentDeletionManager deletionManager = 
mock(SegmentDeletionManager.class);
-    doAnswer(invocationOnMock -> 
null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
+    doAnswer(invocationOnMock -> null).when(deletionManager)
+        .removeAgedDeletedSegments(leadControllerManager,
+            
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
     
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
 
     // Set up verification for deleteSegments with focus on the count and 
segment inclusion rules
@@ -615,7 +621,8 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager 
is actually called.
-    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager);
+    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager,
+        
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
 
     // Verify that the deleteSegments method is actually called.
     verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), 
anyList());
@@ -652,7 +659,8 @@ public class RetentionManagerTest {
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
 
     // Verify that the removeAgedDeletedSegments() method in deletion manager 
is actually called.
-    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager);
+    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager,
+        
ControllerConf.ControllerPeriodicTasksConf.DEFAULT_AGED_SEGMENTS_DELETION_BATCH_SIZE);
 
     // Verify that the deleteSegments method is actually called.
     verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), 
anyList());
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index 3dcac7b01ea..2db31ac2049 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -34,6 +34,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
@@ -48,6 +49,7 @@ import 
org.apache.pinot.controller.helix.core.SegmentDeletionManager;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.filesystem.FileMetadata;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
@@ -244,7 +246,7 @@ public class SegmentDeletionManagerTest {
     deletionManager.removeAgedDeletedSegments(leadControllerManager);
 
     // Create deleted directory
-    String deletedDirectoryPath = tempDir + File.separator + 
"Deleted_Segments";
+    String deletedDirectoryPath = tempDir + File.separator + 
SegmentDeletionManager.DELETED_SEGMENTS;
     File deletedDirectory = new File(deletedDirectoryPath);
     deletedDirectory.mkdir();
 
@@ -295,18 +297,26 @@ public class SegmentDeletionManagerTest {
     deletionManager.removeAgedDeletedSegments(leadControllerManager);
 
     // Check that only 1 day retention file is remaining
-    Assert.assertEquals(dummyDir1.list().length, 1);
+    TestUtils.waitForCondition((aVoid) -> dummyDir1.list().length == 1, 1000, 
100000,
+        "Unable to delete desired segments from dummyDir1");
+
+    // Check that empty directory has not been removed in the first run
+    TestUtils.waitForCondition((aVoid) -> dummyDir2.exists(), 1000, 100000,
+        "dummyDir2 does not exist");
 
-    // Check that empty directory has successfully been removed.
-    Assert.assertEquals(dummyDir2.exists(), false);
 
     // Check that deleted file without retention suffix is honoring 
cluster-wide retention period of 7 days.
-    Assert.assertEquals(dummyDir3.list().length, 1);
+    TestUtils.waitForCondition((aVoid) -> dummyDir3.list().length == 1, 1000, 
100000,
+        "Unable to delete desired segments from dummyDir3");
+
+    // Try to remove empty directory in the next run
+    deletionManager.removeAgedDeletedSegments(leadControllerManager);
+    Assert.assertFalse(dummyDir2.exists());
   }
 
   @Test
   public void testRemoveDeletedSegmentsForGcsPinotFS()
-      throws URISyntaxException, IOException {
+      throws URISyntaxException, IOException, InterruptedException {
     Map<String, Object> properties = new HashMap<>();
     
properties.put(CommonConstants.Controller.PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY 
+ ".class",
         LocalPinotFS.class.getName());
@@ -323,9 +333,9 @@ public class SegmentDeletionManagerTest {
     PinotFSFactory.register("fake", FakePinotFs.class.getName(), null);
     PinotFS pinotFS = PinotFSFactory.create("fake");
 
-    URI tableUri1 = new 
URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_1/");
+    URI tableUri1 = new URI("fake://bucket/sc/managed/pinot/" + 
SegmentDeletionManager.DELETED_SEGMENTS + "/table_1/");
     pinotFS.mkdir(tableUri1);
-    for (int i = 0; i < 101; i++) {
+    for (int i = 0; i < 
SegmentDeletionManager.NUM_AGED_SEGMENTS_TO_DELETE_PER_ATTEMPT + 1; i++) {
       URI segmentURIForTable =
           new URI(tableUri1.getPath() + "segment" + i + 
RETENTION_UNTIL_SEPARATOR + "201901010000");
       pinotFS.mkdir(segmentURIForTable);
@@ -335,7 +345,7 @@ public class SegmentDeletionManagerTest {
     pinotFS.mkdir(segmentURIForTable);
 
     // Create dummy files
-    URI tableUri2 = new 
URI("fake://bucket/sc/managed/pinot/Deleted_Segments/table_2/");
+    URI tableUri2 = new URI("fake://bucket/sc/managed/pinot/" + 
SegmentDeletionManager.DELETED_SEGMENTS + "/table_2/");
     URI segment1ForTable2 = new URI(tableUri2.getPath() + "segment1" + 
RETENTION_UNTIL_SEPARATOR + "201901010000");
     URI segment2ForTable2 = new URI(tableUri2.getPath() + "segment1" + 
RETENTION_UNTIL_SEPARATOR + "201801010000");
     pinotFS.mkdir(tableUri2);
@@ -343,11 +353,31 @@ public class SegmentDeletionManagerTest {
     pinotFS.mkdir(segment2ForTable2);
 
     deletionManager1.removeAgedDeletedSegments(leadControllerManager);
-    // all files should get deleted
-    Assert.assertFalse(pinotFS.exists(tableUri2));
+
+    // All files should get deleted but the directory will be deleted in the 
next run
+    TestUtils.waitForCondition((aVoid) -> {
+          try {
+            return pinotFS.listFiles(tableUri2, false).length == 0;
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }, 1000, 10000,
+        "Could not delete all the files for table_2");
+    Assert.assertTrue(pinotFS.exists(tableUri2));
 
     // One file that doesn't meet retention criteria, and another file due to 
the per attempt batch limit remains.
-    Assert.assertEquals(pinotFS.listFiles(tableUri1, false).length, 2);
+    TestUtils.waitForCondition((aVoid) -> {
+          try {
+            return pinotFS.listFiles(tableUri1, false).length == 2;
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }, 1000, 10000,
+        "100 out of 102 files could not be deleted from tableUri1 directory");
+
+    // the next run of the deletion manager should remove the empty directory 
as well.
+    deletionManager1.removeAgedDeletedSegments(leadControllerManager);
+    Assert.assertFalse(pinotFS.exists(tableUri2));
   }
 
   @Test
@@ -369,8 +399,9 @@ public class SegmentDeletionManagerTest {
     Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
     createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted());
     final File tableDir = new File(tempDir.getAbsolutePath() + File.separator 
+ TABLE_NAME);
-    final File deletedTableDir = new File(tempDir.getAbsolutePath() + 
File.separator + "Deleted_Segments"
-        + File.separator + TABLE_NAME);
+    final File deletedTableDir =
+        new File(tempDir.getAbsolutePath() + File.separator + 
SegmentDeletionManager.DELETED_SEGMENTS
+            + File.separator + TABLE_NAME);
 
     // delete the segments instantly.
     SegmentsValidationAndRetentionConfig mockValidationConfig = 
mock(SegmentsValidationAndRetentionConfig.class);
@@ -425,8 +456,9 @@ public class SegmentDeletionManagerTest {
     Set<String> segments = new HashSet<>(segmentsThatShouldBeDeleted());
     createTableAndSegmentFilesWithGZExtension(tempDir, 
segmentsThatShouldBeDeleted());
     final File tableDir = new File(tempDir.getAbsolutePath() + File.separator 
+ TABLE_NAME);
-    final File deletedTableDir = new File(tempDir.getAbsolutePath() + 
File.separator + "Deleted_Segments"
-        + File.separator + TABLE_NAME);
+    final File deletedTableDir =
+        new File(tempDir.getAbsolutePath() + File.separator + 
SegmentDeletionManager.DELETED_SEGMENTS
+            + File.separator + TABLE_NAME);
 
     // mock returning ZK Metadata for segment url
     ZNRecord znRecord1 = mock(org.apache.helix.ZNRecord.class);
@@ -493,8 +525,9 @@ public class SegmentDeletionManagerTest {
     List<String> segmentsThatShouldBeDeleted = segmentsThatShouldBeDeleted();
     createTableAndSegmentFiles(tempDir, segmentsThatShouldBeDeleted);
     final File tableDir = new File(tempDir.getAbsolutePath() + File.separator 
+ TABLE_NAME);
-    final File deletedTableDir = new File(tempDir.getAbsolutePath() + 
File.separator + "Deleted_Segments"
-        + File.separator + TABLE_NAME);
+    final File deletedTableDir =
+        new File(tempDir.getAbsolutePath() + File.separator + 
SegmentDeletionManager.DELETED_SEGMENTS
+            + File.separator + TABLE_NAME);
 
     deletionManager.removeSegmentsFromStoreInBatch(TABLE_NAME, 
segmentsThatShouldBeDeleted, 0L);
 
@@ -569,12 +602,6 @@ public class SegmentDeletionManagerTest {
       super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L, 
0L);
     }
 
-    @Override
-    protected void removeSegmentFromStore(String tableName, String segmentId,
-        @Nullable Long deletedSegmentsRetentionMs) {
-      _segmentsRemovedFromStore.add(segmentId);
-    }
-
     @Override
     protected void deleteSegmentsWithDelay(String tableName, 
Collection<String> segmentIds,
         @Nullable Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
@@ -619,20 +646,39 @@ public class SegmentDeletionManagerTest {
         _tableDirs.remove(uri.getPath() + "/");
         return true;
       }
-      // remote the segment
+      // remove the segment
       String tableName = uri.getPath().substring(0, 
uri.getPath().lastIndexOf("/") + 1);
       return _tableDirs.get(tableName).remove(uri.getPath());
     }
 
+    @Override
+    public boolean deleteBatch(List<URI> segmentUris, boolean forceDelete)
+        throws IOException {
+      // the expectation here is that the batch delete call is only limited to 
segments.
+      URI segmentURI = segmentUris.get(0);
+      String tableName = segmentURI.getPath().substring(0, 
segmentURI.getPath().lastIndexOf("/") + 1);
+      if (_tableDirs.containsKey(tableName)) {
+        // remove all the segments from the table directory
+        segmentUris.forEach(segmentUri -> 
_tableDirs.get(tableName).remove(segmentUri.getPath()));
+        return true;
+      }
+      // the table does not exist and we return a false;
+      return false;
+    }
+
     @Override
     public boolean exists(URI fileUri) {
-      return fileUri.getPath().endsWith("Deleted_Segments") || 
_tableDirs.containsKey(fileUri.getPath() + "/");
+      String uriPath = fileUri.getPath();
+      if (uriPath.endsWith("/")) {
+        uriPath = uriPath.substring(0, uriPath.lastIndexOf("/"));
+      }
+      return uriPath.endsWith(SegmentDeletionManager.DELETED_SEGMENTS) || 
_tableDirs.containsKey(uriPath + "/");
     }
 
     @Override
     public String[] listFiles(URI fileUri, boolean recursive)
         throws IOException {
-      if (fileUri.getPath().endsWith("Deleted_Segments")) {
+      if (fileUri.getPath().endsWith(SegmentDeletionManager.DELETED_SEGMENTS)) 
{
         return _tableDirs.keySet().toArray(new String[0]);
       }
       // the call to list segments will come without the delimiter after the 
table name
@@ -640,9 +686,23 @@ public class SegmentDeletionManagerTest {
       return _tableDirs.get(tableName).toArray(new String[0]);
     }
 
+    @Override
+    public List<FileMetadata> listFilesWithMetadata(URI fileUri, boolean 
recursive) {
+      if (_tableDirs.containsKey(fileUri.getPath() + "/")) {
+        return _tableDirs.get(fileUri.getPath() + "/")
+            .stream()
+            .map(segmentFilePath -> new 
FileMetadata.Builder().setFilePath(segmentFilePath).build())
+            .collect(Collectors.toList());
+      }
+      return List.of();
+    }
+
     @Override
     public boolean isDirectory(URI uri) {
-      return true;
+      if (_tableDirs.containsKey(uri.getPath() + "/")) {
+        return true;
+      }
+      return uri.getPath().endsWith(SegmentDeletionManager.DELETED_SEGMENTS);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to