This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 68260f9870 Add support for orphan segment cleanup (#15142)
68260f9870 is described below

commit 68260f98705e15a4005bdcb8ea4d454736aec652
Author: 9aman <35227405+9a...@users.noreply.github.com>
AuthorDate: Tue Mar 18 22:42:50 2025 +0530

    Add support for orphan segment cleanup (#15142)
    
    * Add support for orphan segment cleanup
    
    * Add unit tests to the code
    
    * Minor refactoring
    
    * Fixing checkstyle violations
    
    * Add support for realtime tables
    
    * Fix test failures
    
    * Adding metrics, improving logs for better debuggability
    
    * Reducing code repetition
    
    * Add support for providing match size for number of untracked segments to 
be deleted in an single run of RetentionManager
    
    * Add test cases to test batch sizes
    
    * Provide additional comments for test class
    
    * Fixing linting issues
    
    * Adding a controller config to enable/ disable deletion of untracked 
segments from deepstore during retention manager run
---
 .../pinot/common/metrics/ControllerGauge.java      |   6 +-
 .../apache/pinot/controller/ControllerConf.java    |  13 +
 .../helix/core/SegmentDeletionManager.java         |   4 +-
 .../helix/core/retention/RetentionManager.java     | 162 +++++++++-
 .../core/retention/strategy/RetentionStrategy.java |  13 +-
 .../retention/strategy/TimeRetentionStrategy.java  |  14 +-
 .../helix/core/retention/RetentionManagerTest.java | 348 ++++++++++++++++++++-
 .../SegmentsValidationAndRetentionConfig.java      |  10 +
 8 files changed, 538 insertions(+), 32 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
index 40e2d09b4f..4777573604 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ControllerGauge.java
@@ -190,7 +190,11 @@ public enum ControllerGauge implements 
AbstractMetrics.Gauge {
   REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", 
true),
 
   // Resource utilization is within limits or not for a table
-  RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded", 
false);
+  RESOURCE_UTILIZATION_LIMIT_EXCEEDED("ResourceUtilizationLimitExceeded", 
false),
+
+  // The number of segments in deepstore that do not have corresponding 
metadata in ZooKeeper.
+  // These segments are untracked and should be considered for deletion based 
on retention policies.
+  UNTRACKED_SEGMENTS_COUNT("untrackedSegmentsCount", false);
 
   private final String _gaugeName;
   private final String _unit;
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index d3396cd3a7..384371e2e8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -241,6 +241,10 @@ public class ControllerConf extends PinotConfiguration {
     public static final String TMP_SEGMENT_RETENTION_IN_SECONDS =
         "controller.realtime.segment.tmpFileRetentionInSeconds";
 
+    // Enables the deletion of untracked segments during the retention manager 
run.
+    // Untracked segments are those that exist in deep store but have no 
corresponding entry in the ZK property store.
+    public static final String ENABLE_UNTRACKED_SEGMENT_DELETION =
+        "controller.retentionManager.untrackedSegmentDeletionEnabled";
     public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
     public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
     public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 
60 * 60; // 1 Hour.
@@ -1081,6 +1085,15 @@ public class ControllerConf extends PinotConfiguration {
         
ControllerPeriodicTasksConf.DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND);
   }
 
+  public boolean getUntrackedSegmentDeletionEnabled() {
+    return 
getProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, 
false);
+  }
+
+  public void setUntrackedSegmentDeletionEnabled(boolean 
untrackedSegmentDeletionEnabled) {
+    setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, 
untrackedSegmentDeletionEnabled);
+  }
+
+
   public long getPinotTaskManagerInitialDelaySeconds() {
     return getPeriodicTaskInitialDelayInSeconds();
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index e5ab692fc9..c001042f9a 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -222,7 +222,9 @@ public class SegmentDeletionManager {
       URI segmentMetadataUri = 
SegmentPushUtils.generateSegmentMetadataURI(segmentFileUri.toString(), 
segmentId);
       if (pinotFS.exists(segmentMetadataUri)) {
         LOGGER.info("Deleting segment metadata {} from {}", segmentId, 
segmentMetadataUri);
-        pinotFS.delete(segmentMetadataUri, true);
+        if (!pinotFS.delete(segmentMetadataUri, true)) {
+          LOGGER.warn("Could not delete segment metadata: {} from: {}", 
segmentId, segmentMetadataUri);
+        }
       }
     } catch (IOException e) {
       LOGGER.warn("Could not delete segment metadata {} from {}", segmentId, 
segmentFileUri, e);
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 9bd8b9f62f..5b365d7768 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -18,18 +18,26 @@
  */
 package org.apache.pinot.controller.helix.core.retention;
 
+import java.io.IOException;
+import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.logging.log4j.util.Strings;
 import org.apache.pinot.common.lineage.SegmentLineage;
 import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.metrics.ControllerGauge;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.TarCompressionUtils;
+import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -39,6 +47,9 @@ import 
org.apache.pinot.controller.helix.core.retention.strategy.TimeRetentionSt
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.filesystem.FileMetadata;
+import org.apache.pinot.spi.filesystem.PinotFS;
+import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -55,7 +66,9 @@ import org.slf4j.LoggerFactory;
  */
 public class RetentionManager extends ControllerPeriodicTask<Void> {
   public static final long OLD_LLC_SEGMENTS_RETENTION_IN_MILLIS = 
TimeUnit.DAYS.toMillis(5L);
+  public static final int DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE = 100;
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
+  private final boolean _untrackedSegmentDeletionEnabled;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RetentionManager.class);
 
@@ -64,7 +77,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     super("RetentionManager", 
config.getRetentionControllerFrequencyInSeconds(),
         config.getRetentionManagerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
         controllerMetrics);
-
+    _untrackedSegmentDeletionEnabled = 
config.getUntrackedSegmentDeletionEnabled();
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", 
getIntervalInSeconds());
   }
 
@@ -105,6 +118,10 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
     String retentionTimeUnit = validationConfig.getRetentionTimeUnit();
     String retentionTimeValue = validationConfig.getRetentionTimeValue();
+    int untrackedSegmentsDeletionBatchSize =
+        validationConfig.getUntrackedSegmentsDeletionBatchSize() != null ? 
Integer.parseInt(
+            validationConfig.getUntrackedSegmentsDeletionBatchSize()) : 
DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE;
+
     RetentionStrategy retentionStrategy;
     try {
       retentionStrategy = new 
TimeRetentionStrategy(TimeUnit.valueOf(retentionTimeUnit.toUpperCase()),
@@ -117,15 +134,23 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
 
     // Scan all segment ZK metadata and purge segments if necessary
     if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
-      manageRetentionForOfflineTable(tableNameWithType, retentionStrategy);
+      manageRetentionForOfflineTable(tableNameWithType, retentionStrategy, 
untrackedSegmentsDeletionBatchSize);
     } else {
-      manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy);
+      manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy, 
untrackedSegmentsDeletionBatchSize);
     }
   }
 
-  private void manageRetentionForOfflineTable(String offlineTableName, 
RetentionStrategy retentionStrategy) {
-    List<String> segmentsToDelete = new ArrayList<>();
-    for (SegmentZKMetadata segmentZKMetadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName)) {
+  private void manageRetentionForOfflineTable(String offlineTableName, 
RetentionStrategy retentionStrategy,
+      int untrackedSegmentsDeletionBatchSize) {
+    List<SegmentZKMetadata> segmentZKMetadataList = 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
+
+    // fetch those segments that are beyond the retention period and don't 
have an entry in ZK i.e.
+    // SegmentZkMetadata is missing for those segments
+    List<String> segmentsToDelete =
+        getSegmentsToDeleteFromDeepstore(offlineTableName, retentionStrategy, 
segmentZKMetadataList,
+            untrackedSegmentsDeletionBatchSize);
+
+    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
       if (retentionStrategy.isPurgeable(offlineTableName, segmentZKMetadata)) {
         segmentsToDelete.add(segmentZKMetadata.getSegmentName());
       }
@@ -136,11 +161,20 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
   }
 
-  private void manageRetentionForRealtimeTable(String realtimeTableName, 
RetentionStrategy retentionStrategy) {
-    List<String> segmentsToDelete = new ArrayList<>();
+  private void manageRetentionForRealtimeTable(String realtimeTableName, 
RetentionStrategy retentionStrategy,
+      int untrackedSegmentsDeletionBatchSize) {
+    List<SegmentZKMetadata> segmentZKMetadataList = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
+
+    // fetch those segments that are beyond the retention period and don't 
have an entry in ZK i.e.
+    // SegmentZkMetadata is missing for those segments
+    List<String> segmentsToDelete =
+        getSegmentsToDeleteFromDeepstore(realtimeTableName, retentionStrategy, 
segmentZKMetadataList,
+            untrackedSegmentsDeletionBatchSize);
+
     IdealState idealState = _pinotHelixResourceManager.getHelixAdmin()
         
.getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(), 
realtimeTableName);
-    for (SegmentZKMetadata segmentZKMetadata : 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName)) {
+
+    for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
       String segmentName = segmentZKMetadata.getSegmentName();
       if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS) {
         // Delete old LLC segment that hangs around. Do not delete segment 
that are current since there may be a race
@@ -189,6 +223,116 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
   }
 
+  private List<String> getSegmentsToDeleteFromDeepstore(String 
tableNameWithType, RetentionStrategy retentionStrategy,
+      List<SegmentZKMetadata> segmentZKMetadataList, int 
untrackedSegmentsDeletionBatchSize) {
+    List<String> segmentsToDelete = new ArrayList<>();
+
+    if (!_untrackedSegmentDeletionEnabled) {
+      LOGGER.info(
+          "Not scanning deep store for untracked segments for table: {}", 
tableNameWithType);
+      return segmentsToDelete;
+    }
+
+    if (untrackedSegmentsDeletionBatchSize <= 0) {
+      // return an empty list in case untracked segment deletion batch size is 
configured < 0 in table config
+      LOGGER.info(
+          "Not scanning deep store for untracked segments for table: {} as 
untrackedSegmentsDeletionBatchSize is set "
+              + "to: {}",
+          tableNameWithType, untrackedSegmentsDeletionBatchSize);
+      return segmentsToDelete;
+    }
+
+    List<String> segmentsPresentInZK =
+        
segmentZKMetadataList.stream().map(SegmentZKMetadata::getSegmentName).collect(Collectors.toList());
+    try {
+      LOGGER.info("Fetch segments present in deep store that are beyond 
retention period for table: {}",
+          tableNameWithType);
+      segmentsToDelete =
+          findUntrackedSegmentsToDeleteFromDeepstore(tableNameWithType, 
retentionStrategy, segmentsPresentInZK);
+      _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.UNTRACKED_SEGMENTS_COUNT,
+          segmentsToDelete.size());
+
+      if (segmentsToDelete.size() > untrackedSegmentsDeletionBatchSize) {
+        LOGGER.info("Truncating segments to delete from {} to {} for table: 
{}",
+            segmentsToDelete.size(), untrackedSegmentsDeletionBatchSize, 
tableNameWithType);
+        segmentsToDelete = segmentsToDelete.subList(0, 
untrackedSegmentsDeletionBatchSize);
+      }
+    } catch (IOException e) {
+      LOGGER.warn("Unable to fetch segments from deep store that are beyond 
retention period for table: {}",
+          tableNameWithType);
+    }
+
+    return segmentsToDelete;
+  }
+
+
+  /**
+   * Identifies segments in deepstore that are ready for deletion based on the 
retention strategy.
+   *
+   * This method finds segments that are beyond the retention period and are 
ready to be purged.
+   * It only considers segments that do not have entries in ZooKeeper metadata 
i.e. untracked segments.
+   * The lastModified time of the file in deepstore is used to determine 
whether the segment
+   * should be retained or purged.
+   *
+   * @param tableNameWithType   Name of the offline table
+   * @param retentionStrategy  Strategy to determine if a segment should be 
purged
+   * @param segmentsToExclude  List of segment names that should be excluded 
from deletion
+   * @return List of segment names that should be deleted from deepstore
+   * @throws IOException If there's an error accessing the filesystem
+   */
+  private List<String> findUntrackedSegmentsToDeleteFromDeepstore(String 
tableNameWithType,
+      RetentionStrategy retentionStrategy, List<String> segmentsToExclude)
+      throws IOException {
+
+    List<String> segmentsToDelete = new ArrayList<>();
+    String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
+    URI tableDataUri = 
URIUtils.getUri(_pinotHelixResourceManager.getDataDir(), rawTableName);
+    PinotFS pinotFS = PinotFSFactory.create(tableDataUri.getScheme());
+
+    long startTimeMs = System.currentTimeMillis();
+
+    List<FileMetadata> deepstoreFiles = 
pinotFS.listFilesWithMetadata(tableDataUri, false);
+    long listEndTimeMs = System.currentTimeMillis();
+    LOGGER.info("Found: {} segments in deepstore for table: {}. Time taken to 
list segments: {} ms",
+        deepstoreFiles.size(), tableNameWithType, listEndTimeMs - startTimeMs);
+
+    for (FileMetadata fileMetadata : deepstoreFiles) {
+      if (fileMetadata.isDirectory()) {
+        continue;
+      }
+
+      String segmentName = extractSegmentName(fileMetadata.getFilePath());
+      if (Strings.isEmpty(segmentName) || 
segmentsToExclude.contains(segmentName)) {
+        continue;
+      }
+
+      // determine whether the segment should be purged or not based on the 
last modified time of the file
+      long lastModifiedTime = fileMetadata.getLastModifiedTime();
+
+      if (retentionStrategy.isPurgeable(tableNameWithType, segmentName, 
lastModifiedTime)) {
+        segmentsToDelete.add(segmentName);
+      }
+    }
+    long endTimeMs = System.currentTimeMillis();
+    LOGGER.info(
+        "Took: {} ms to identify {} segments for deletion from deep store for 
table: {} as they have no corresponding"
+            + " entry in the property store.",
+        endTimeMs - startTimeMs, segmentsToDelete.size(), tableNameWithType);
+    return segmentsToDelete;
+  }
+
+  @Nullable
+  private String extractSegmentName(@Nullable String filePath) {
+    if (Strings.isEmpty(filePath)) {
+      return null;
+    }
+    String segmentName = filePath.substring(filePath.lastIndexOf("/") + 1);
+    if (segmentName.endsWith(TarCompressionUtils.TAR_GZ_FILE_EXTENSION)) {
+      segmentName = segmentName.substring(0, segmentName.length() - 
TarCompressionUtils.TAR_GZ_FILE_EXTENSION.length());
+    }
+    return segmentName;
+  }
+
   private void manageSegmentLineageCleanupForTable(TableConfig tableConfig) {
     String tableNameWithType = tableConfig.getTableName();
     List<String> segmentsToDelete = new ArrayList<>();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
index e8f6336961..8e31cce37c 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/RetentionStrategy.java
@@ -34,4 +34,15 @@ public interface RetentionStrategy {
    * @return Whether the segment should be purged
    */
   boolean isPurgeable(String tableNameWithType, SegmentZKMetadata 
segmentZKMetadata);
-}
+
+  /**
+   * Determines whether a segment is eligible for purging
+   *
+   * @param tableNameWithType The table name, including its type.
+   * @param segmentName The name of the segment to evaluate.
+   * @param segmentTimeMs The segment's timestamp in milliseconds, which could 
be the end time from ZK metadata or
+   *                      the modification time (mTime) for the file in deep 
store etc.
+   * @return {@code true} if the segment should be purged; {@code false} 
otherwise.
+   */
+  boolean isPurgeable(String tableNameWithType, String segmentName, long 
segmentTimeMs);
+  }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index b98fe5b534..cda94dbb5d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -39,15 +39,19 @@ public class TimeRetentionStrategy implements 
RetentionStrategy {
 
   @Override
   public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata 
segmentZKMetadata) {
-    long endTimeMs = segmentZKMetadata.getEndTimeMs();
+    return isPurgeable(tableNameWithType, segmentZKMetadata.getSegmentName(), 
segmentZKMetadata.getEndTimeMs());
+  }
+
+  @Override
+  public boolean isPurgeable(String tableNameWithType, String segmentName, 
long segmentTimeMs) {
 
     // Check that the end time is between 1971 and 2071
-    if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
-      LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: 
{}", segmentZKMetadata.getSegmentName(),
-          tableNameWithType, endTimeMs);
+    if (!TimeUtils.timeValueInValidRange(segmentTimeMs)) {
+      LOGGER.warn("Segment: {} of table: {} has invalid end time in millis: 
{}", segmentName,
+          tableNameWithType, segmentTimeMs);
       return false;
     }
 
-    return System.currentTimeMillis() - endTimeMs > _retentionMs;
+    return System.currentTimeMillis() - segmentTimeMs > _retentionMs;
   }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
index b3e656de9e..f4a71c55f1 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java
@@ -18,10 +18,16 @@
  */
 package org.apache.pinot.controller.helix.core.retention;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileTime;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixAdmin;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -42,12 +48,16 @@ import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import static 
org.apache.pinot.controller.helix.core.retention.RetentionManager.DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.*;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
@@ -57,7 +67,31 @@ public class RetentionManagerTest {
   private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
   private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(TEST_TABLE_NAME);
 
-  private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, 
long dayAfterTomorrowTimeStamp) {
+  // Variables for real file test
+  private Path _tempDir;
+  private File _tableDir;
+
+  @BeforeMethod
+  public void setUp() throws Exception {
+    // Setup for real file test
+    _tempDir = Files.createTempDirectory("pinot-retention-test");
+    _tableDir = new File(_tempDir.toFile(), TEST_TABLE_NAME);
+    _tableDir.mkdirs();
+
+    final long pastMillisSinceEpoch = 1343001600000L;
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+  }
+
+  @AfterMethod
+  public void tearDown() throws Exception {
+    // Clean up the temporary directory after each test
+    if (_tempDir != null) {
+      FileUtils.deleteDirectory(_tempDir.toFile());
+    }
+  }
+
+  private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, 
long dayAfterTomorrowTimeStamp,
+      String untrackedSegmentsDeletionBatchSize, int 
untrackedSegmentsInDeepstoreSize) {
     List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
     // Create metadata for 10 segments really old, that will be removed by the 
retention manager.
     final int numOlderSegments = 10;
@@ -73,19 +107,77 @@ public class RetentionManagerTest {
           mockSegmentZKMetadata(dayAfterTomorrowTimeStamp, 
dayAfterTomorrowTimeStamp, timeUnit);
       segmentsZKMetadata.add(segmentZKMetadata);
     }
+
+    // Create actual segment files with specific modification times
+    // 1. A file that should be kept (in ZK metadata)
+    File segment1File = new File(_tableDir, 
segmentsZKMetadata.get(0).getSegmentName());
+    createFileWithContent(segment1File, "segment1 data");
+    setFileModificationTime(segment1File, timeUnit.toMillis(pastTimeStamp));
+
+    // 2. A file that should be kept (in ZK metadata)
+    File segment2File = new File(_tableDir, 
segmentsZKMetadata.get(10).getSegmentName());
+    createFileWithContent(segment2File, "segment2 data");
+    setFileModificationTime(segment2File, timeUnit.toMillis(pastTimeStamp));
+
+    // 3. A file that should not be deleted (not in ZK metadata but recent)
+    File segment3File = new File(_tableDir, "segment3.tar.gz");
+    createFileWithContent(segment3File, "segment3 data");
+    setFileModificationTime(segment3File, 
timeUnit.toMillis(dayAfterTomorrowTimeStamp));
+
+    int deletionBatchSize = untrackedSegmentsDeletionBatchSize == null ? 
DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE
+        : Integer.parseInt(untrackedSegmentsDeletionBatchSize);
+
+    // Create additional untracked segment files to test batch size limit
+    if (untrackedSegmentsInDeepstoreSize > 0) {
+      // Create more untracked segments
+      for (int i = 0; i < untrackedSegmentsInDeepstoreSize; i++) {
+        String segmentName = "extraSegment" + i;
+        File segmentFile = new File(_tableDir, segmentName);
+        createFileWithContent(segmentFile, "extra segment " + i + " data");
+        setFileModificationTime(segmentFile, timeUnit.toMillis(pastTimeStamp));
+        if (i < deletionBatchSize) {
+          // Add segments to the removed list till we reach 
untrackedSegmentsDeletionBatchSize
+          removedSegments.add(segmentName);
+        }
+      }
+    }
+
     final TableConfig tableConfig = createOfflineTableConfig();
+    // Set untrackedSegmentsDeletionBatchSize if not null
+    if (untrackedSegmentsDeletionBatchSize != null) {
+      
tableConfig.getValidationConfig().setUntrackedSegmentsDeletionBatchSize(untrackedSegmentsDeletionBatchSize);
+    }
+
     LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
     when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
     PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
-    setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager, leadControllerManager);
+
+    // Use appropriate setup based on test case
+    // In case of untrackedSegmentsDeletionBatchSize < 
untrackedSegmentsInDeepstoreSize, we cannot guarantee which
+    // files/ segments will be picked for deletion as there is not ordering/ 
sorting done before selecting
+    // untrackedSegmentsDeletionBatchSize out of 
untrackedSegmentsInDeepstoreSize to delete.
+    // For the case untrackedSegmentsDeletionBatchSize < 
untrackedSegmentsInDeepstoreSize we just check the size of the
+    // segments that will get deleted.
+    // if the untrackedSegmentsDeletionBatchSize all the segments will be 
deleted as the batch size by default is 100
+    if (deletionBatchSize >= untrackedSegmentsInDeepstoreSize) {
+      // Use original setup for the case when all the segments will be included
+      setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager, leadControllerManager);
+    } else {
+      // Use batch size specific setup
+      setupPinotHelixResourceManagerForBatchSize(tableConfig, numOlderSegments,
+          deletionBatchSize, segmentsZKMetadata,
+          pinotHelixResourceManager, leadControllerManager);
+    }
 
     
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
     
when(pinotHelixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(segmentsZKMetadata);
+    
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
 
     ControllerConf conf = new ControllerConf();
     ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
     conf.setRetentionControllerFrequencyInSeconds(0);
     conf.setDeletedSegmentsRetentionInDays(0);
+    conf.setUntrackedSegmentDeletionEnabled(true);
     RetentionManager retentionManager =
         new RetentionManager(pinotHelixResourceManager, leadControllerManager, 
conf, controllerMetrics);
     retentionManager.start();
@@ -93,52 +185,212 @@ public class RetentionManagerTest {
 
     SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
 
-    // Verify that the removeAgedDeletedSegments() method in deletion manager 
is actually called.
+    // Verify that the removeAgedDeletedSegments() method in deletion manager 
is called
     verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager);
 
-    // Verify that the deleteSegments method is actually called.
-    verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), 
anyList());
+    // Verify deleteSegments is called
+    verify(pinotHelixResourceManager, 
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
   }
 
   @Test
-  public void testRetentionWithMinutes() {
+  public void testRetentionWithMinutesNoBatchSizeAndSegmentsInDeepStore() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60;
     final long pastMinutesSinceEpoch = 22383360L;
-    testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, 
minutesSinceEpochTimeStamp);
+    testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, 
minutesSinceEpochTimeStamp, null, 4);
+  }
+
+  @Test
+  public void testRetentionWithMinutesNoBatchSizeAndMoreSegmentsInDeepStore() {
+    // For this test the default batch size will get picked
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60;
+    final long pastMinutesSinceEpoch = 22383360L;
+    testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, 
minutesSinceEpochTimeStamp, null, 105);
+  }
+
+
+  @Test
+  public void 
testRetentionWithMinutesWithBatchSizeAndLessSegmentsInDeepStore() {
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60;
+    final long pastMinutesSinceEpoch = 22383360L;
+    testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, 
minutesSinceEpochTimeStamp, "5", 3);
+  }
+
+  @Test
+  public void 
testRetentionWithMinutesWithBatchSizeAndMoreSegmentsInDeepStore() {
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long minutesSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60;
+    final long pastMinutesSinceEpoch = 22383360L;
+    testDifferentTimeUnits(pastMinutesSinceEpoch, TimeUnit.MINUTES, 
minutesSinceEpochTimeStamp, "5", 10);
+  }
+
+
+  @Test
+  public void testRetentionWithSecondsNoBatchSizeAndSegmentsInDeepStore() {
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60;
+    final long pastSecondsSinceEpoch = 1343001600L;
+    testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, 
secondsSinceEpochTimeStamp, null, 4);
+  }
+
+  @Test
+  public void 
testRetentionWithSecondsWithBatchSizeAndLessSegmentsInDeepStore() {
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60;
+    final long pastSecondsSinceEpoch = 1343001600L;
+    testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, 
secondsSinceEpochTimeStamp, "5", 3);
   }
 
   @Test
-  public void testRetentionWithSeconds() {
+  public void 
testRetentionWithSecondsWithBatchSizeAndMoreSegmentsInDeepStore() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long secondsSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60;
     final long pastSecondsSinceEpoch = 1343001600L;
-    testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, 
secondsSinceEpochTimeStamp);
+    testDifferentTimeUnits(pastSecondsSinceEpoch, TimeUnit.SECONDS, 
secondsSinceEpochTimeStamp, "5", 10);
   }
 
   @Test
-  public void testRetentionWithMillis() {
+  public void testRetentionWithMillisNoBatchSizeAndSegmentsInDeepStore() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60 * 1000;
     final long pastMillisSinceEpoch = 1343001600000L;
-    testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, 
millisSinceEpochTimeStamp);
+    testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, 
millisSinceEpochTimeStamp, null, 4);
   }
 
   @Test
-  public void testRetentionWithHours() {
+  public void testRetentionWithMillisWithBatchSizeAndLessSegmentsInDeepStore() 
{
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60 * 1000;
+    final long pastMillisSinceEpoch = 1343001600000L;
+    testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, 
millisSinceEpochTimeStamp, "5", 3);
+  }
+
+  @Test
+  public void testRetentionWithMillisWithBatchSizeAndMoreSegmentsInDeepStore() 
{
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long millisSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24 
* 60 * 60 * 1000;
+    final long pastMillisSinceEpoch = 1343001600000L;
+    testDifferentTimeUnits(pastMillisSinceEpoch, TimeUnit.MILLISECONDS, 
millisSinceEpochTimeStamp, "5", 10);
+  }
+
+  @Test
+  public void testRetentionWithHoursNoBatchSizeAndSegmentsInDeepStore() {
     final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
     final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24;
     final long pastHoursSinceEpoch = 373056L;
-    testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, 
hoursSinceEpochTimeStamp);
+    testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, 
hoursSinceEpochTimeStamp, null, 4);
   }
 
   @Test
-  public void testRetentionWithDays() {
+  public void testRetentionWithHoursWithBatchSizeAndLessSegmentsInDeepStore() {
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24;
+    final long pastHoursSinceEpoch = 373056L;
+    testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, 
hoursSinceEpochTimeStamp, "5", 3);
+  }
+
+  @Test
+  public void testRetentionWithHoursWithBatchSizeAndMoreSegmentsInDeepStore() {
+    final long theDayAfterTomorrowSinceEpoch = System.currentTimeMillis() / 
1000 / 60 / 60 / 24 + 2;
+    final long hoursSinceEpochTimeStamp = theDayAfterTomorrowSinceEpoch * 24;
+    final long pastHoursSinceEpoch = 373056L;
+    testDifferentTimeUnits(pastHoursSinceEpoch, TimeUnit.HOURS, 
hoursSinceEpochTimeStamp, "5", 10);
+  }
+
+
+  @Test
+  public void testRetentionWithDaysNoBatchSizeAndSegmentsInDeepStore() {
+    final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 
60 / 60 / 24 + 2;
+    final long pastDaysSinceEpoch = 15544L;
+    testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, 
daysSinceEpochTimeStamp, null, 4);
+  }
+
+  @Test
+  public void testRetentionWithDaysWithBatchSizeAndLessSegmentsInDeepStore() {
     final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 
60 / 60 / 24 + 2;
     final long pastDaysSinceEpoch = 15544L;
-    testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, 
daysSinceEpochTimeStamp);
+    testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, 
daysSinceEpochTimeStamp, "5", 3);
   }
 
+  @Test
+  public void testRetentionWithDaysWithBatchSizeAndMoreSegmentsInDeepStore() {
+    final long daysSinceEpochTimeStamp = System.currentTimeMillis() / 1000 / 
60 / 60 / 24 + 2;
+    final long pastDaysSinceEpoch = 15544L;
+    testDifferentTimeUnits(pastDaysSinceEpoch, TimeUnit.DAYS, 
daysSinceEpochTimeStamp, "5", 10);
+  }
+
+  @Test
+  public void testOffByDefaultForUntrackedSegmentsDeletion() {
+    long pastTimeStamp = 15544L;
+    TimeUnit timeUnit = TimeUnit.DAYS;
+    long dayAfterTomorrowTimeStamp = System.currentTimeMillis() / 1000 / 60 / 
60 / 24 + 2;
+
+    List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>();
+    // Create metadata for 10 segments really old, that will be removed by the 
retention manager.
+    final int numOlderSegments = 10;
+    List<String> removedSegments = new ArrayList<>();
+    for (int i = 0; i < numOlderSegments; i++) {
+      SegmentZKMetadata segmentZKMetadata = 
mockSegmentZKMetadata(pastTimeStamp, pastTimeStamp, timeUnit);
+      segmentsZKMetadata.add(segmentZKMetadata);
+      removedSegments.add(segmentZKMetadata.getSegmentName());
+    }
+    // Create metadata for 5 segments that will not be removed.
+    for (int i = 0; i < 5; i++) {
+      SegmentZKMetadata segmentZKMetadata =
+          mockSegmentZKMetadata(dayAfterTomorrowTimeStamp, 
dayAfterTomorrowTimeStamp, timeUnit);
+      segmentsZKMetadata.add(segmentZKMetadata);
+    }
+
+    // Create actual segment files with specific modification times
+    // 1. A file that should be kept (in ZK metadata)
+    File segment1File = new File(_tableDir, 
segmentsZKMetadata.get(0).getSegmentName());
+    createFileWithContent(segment1File, "segment1 data");
+    setFileModificationTime(segment1File, timeUnit.toMillis(pastTimeStamp));
+
+    // 2. A file that should be kept (in ZK metadata)
+    File segment2File = new File(_tableDir, 
segmentsZKMetadata.get(10).getSegmentName());
+    createFileWithContent(segment2File, "segment2 data");
+    setFileModificationTime(segment2File, timeUnit.toMillis(pastTimeStamp));
+
+    // 3. A file that should not be deleted as the deletion of untracked 
segments is off by default
+    File segment3File = new File(_tableDir, "segment3.tar.gz");
+    createFileWithContent(segment3File, "segment3 data");
+    setFileModificationTime(segment3File, timeUnit.toMillis(pastTimeStamp));
+
+    final TableConfig tableConfig = createOfflineTableConfig();
+
+    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
+    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
+    PinotHelixResourceManager pinotHelixResourceManager = 
mock(PinotHelixResourceManager.class);
+
+      setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager, leadControllerManager);
+
+    
when(pinotHelixResourceManager.getTableConfig(OFFLINE_TABLE_NAME)).thenReturn(tableConfig);
+    
when(pinotHelixResourceManager.getSegmentsZKMetadata(OFFLINE_TABLE_NAME)).thenReturn(segmentsZKMetadata);
+    
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
+
+    ControllerConf conf = new ControllerConf();
+    ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+    conf.setRetentionControllerFrequencyInSeconds(0);
+    conf.setDeletedSegmentsRetentionInDays(0);
+
+    RetentionManager retentionManager =
+        new RetentionManager(pinotHelixResourceManager, leadControllerManager, 
conf, controllerMetrics);
+    retentionManager.start();
+    retentionManager.run();
+
+    SegmentDeletionManager deletionManager = 
pinotHelixResourceManager.getSegmentDeletionManager();
+
+    // Verify that the removeAgedDeletedSegments() method in deletion manager 
is called
+    verify(deletionManager, 
times(1)).removeAgedDeletedSegments(leadControllerManager);
+
+    // Verify deleteSegments is called
+    verify(pinotHelixResourceManager, 
times(1)).deleteSegments(eq(OFFLINE_TABLE_NAME), anyList());
+  }
+
+
   private TableConfig createOfflineTableConfig() {
     return new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setRetentionTimeUnit("DAYS")
         .setRetentionTimeValue("365").setNumReplicas(2).build();
@@ -179,6 +431,47 @@ public class RetentionManagerTest {
     }).when(resourceManager).deleteSegments(anyString(), anyList());
   }
 
+  private void setupPinotHelixResourceManagerForBatchSize(TableConfig 
tableConfig, int numOlderSegments,
+      int untrackedSegmentsDeletionBatchSize, List<SegmentZKMetadata> 
segmentsZKMetadata,
+      PinotHelixResourceManager resourceManager, LeadControllerManager 
leadControllerManager) {
+
+    String tableNameWithType = tableConfig.getTableName();
+    
when(resourceManager.getAllTables()).thenReturn(List.of(tableNameWithType));
+
+    ZkHelixPropertyStore<ZNRecord> propertyStore = 
mock(ZkHelixPropertyStore.class);
+    when(resourceManager.getPropertyStore()).thenReturn(propertyStore);
+
+    SegmentDeletionManager deletionManager = 
mock(SegmentDeletionManager.class);
+    doAnswer(invocationOnMock -> 
null).when(deletionManager).removeAgedDeletedSegments(leadControllerManager);
+    
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);
+
+    // Set up verification for deleteSegments with focus on the count and 
segment inclusion rules
+    doAnswer(invocationOnMock -> {
+      Object[] args = invocationOnMock.getArguments();
+      String tableNameArg = (String) args[0];
+      assertEquals(tableNameArg, tableNameWithType);
+      List<String> segmentListArg = (List<String>) args[1];
+
+      // Verify all the old metadata segments are included
+      for (int i = 0; i < numOlderSegments; i++) {
+        
assertTrue(segmentListArg.contains(segmentsZKMetadata.get(i).getSegmentName()));
+      }
+
+      // Verify segment3 (recent untracked segment) is NOT included
+      assertFalse(segmentListArg.contains("segment3.tar.gz"));
+
+      // Calculate expected total segments that should be deleted
+      // ZK metadata segments + untracked segments up to the batch size limit
+      int expectedTotalSegments = numOlderSegments + 
untrackedSegmentsDeletionBatchSize;
+
+      // Verify the total count is as expected
+      assertEquals(expectedTotalSegments, segmentListArg.size());
+
+      return null;
+    }).when(resourceManager).deleteSegments(anyString(), anyList());
+  }
+
+
   // This test makes sure that we clean up the segments marked OFFLINE in 
realtime for more than 7 days
   @Test
   public void testRealtimeLLCCleanup() {
@@ -194,6 +487,7 @@ public class RetentionManagerTest {
     PinotHelixResourceManager pinotHelixResourceManager =
         setupSegmentMetadata(tableConfig, now, initialNumSegments, 
removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager, leadControllerManager);
+    
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
 
     ControllerConf conf = new ControllerConf();
     ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
@@ -226,6 +520,7 @@ public class RetentionManagerTest {
     PinotHelixResourceManager pinotHelixResourceManager =
         setupSegmentMetadataForPausedTable(tableConfig, now, removedSegments);
     setupPinotHelixResourceManager(tableConfig, removedSegments, 
pinotHelixResourceManager, leadControllerManager);
+    
when(pinotHelixResourceManager.getDataDir()).thenReturn(_tempDir.toString());
 
     ControllerConf conf = new ControllerConf();
     ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
@@ -372,4 +667,27 @@ public class RetentionManagerTest {
     
when(segmentZKMetadata.getEndTimeMs()).thenReturn(timeUnit.toMillis(endTime));
     return segmentZKMetadata;
   }
+
+  /**
+   * Helper method to create a file with content
+   */
+  private void createFileWithContent(File file, String content) {
+    try {
+      Files.write(file.toPath(), content.getBytes());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Helper method to set file modification time
+   */
+  private void setFileModificationTime(File file, long timestamp) {
+    FileTime fileTime = FileTime.fromMillis(timestamp);
+    try {
+      Files.setLastModifiedTime(file.toPath(), fileTime);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 592a6c1960..2bfdc051a6 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -55,6 +55,8 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
   // For more usage of this field, please refer to this design doc: 
https://tinyurl.com/f63ru4sb
   private String _peerSegmentDownloadScheme;
 
+  private String _untrackedSegmentsDeletionBatchSize;
+
   /**
    * @deprecated Use {@link InstanceAssignmentConfig} instead
    */
@@ -250,4 +252,12 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
   public void setMinimizeDataMovement(boolean minimizeDataMovement) {
     _minimizeDataMovement = minimizeDataMovement;
   }
+
+  public String getUntrackedSegmentsDeletionBatchSize() {
+    return _untrackedSegmentsDeletionBatchSize;
+  }
+
+  public void setUntrackedSegmentsDeletionBatchSize(String 
untrackedSegmentsDeletionBatchSize) {
+    _untrackedSegmentsDeletionBatchSize = untrackedSegmentsDeletionBatchSize;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to