This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dea5537c27 Add support for providing custom retention time for 
untracked segments (#16719)
9dea5537c27 is described below

commit 9dea5537c2778564c17bad6f8543c3e04d27fac8
Author: 9aman <[email protected]>
AuthorDate: Thu Sep 11 21:36:24 2025 +0530

    Add support for providing custom retention time for untracked segments 
(#16719)
    
    * Add support for providing custom retention time for untracked segments
    
    * Remove unnecessary configs added
    
    * Ensure valid untracked retention time unit and value are provided in the 
table config
    
    * Make untracked segments retention configurable via cluster config
---
 .../apache/pinot/controller/ControllerConf.java    |  8 ++++
 .../helix/core/SegmentDeletionManager.java         |  2 +-
 .../helix/core/retention/RetentionManager.java     | 51 ++++++++++++++++----
 .../core/util/SegmentDeletionManagerTest.java      |  6 +++
 .../segment/local/utils/TableConfigUtils.java      | 56 +++++++++++++++++++---
 .../SegmentsValidationAndRetentionConfig.java      | 18 +++++++
 6 files changed, 123 insertions(+), 18 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index 1a849cab307..c96355d37e2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -265,6 +265,9 @@ public class ControllerConf extends PinotConfiguration {
     // Untracked segments are those that exist in deep store but have no 
corresponding entry in the ZK property store.
     public static final String ENABLE_UNTRACKED_SEGMENT_DELETION =
         "controller.retentionManager.untrackedSegmentDeletionEnabled";
+    public static final String UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS =
+        "controller.retentionManager.untrackedSegmentsRetentionTimeInDays";
+    public static final int DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS 
= 3;
     public static final int MIN_INITIAL_DELAY_IN_SECONDS = 120;
     public static final int MAX_INITIAL_DELAY_IN_SECONDS = 300;
     public static final int DEFAULT_SPLIT_COMMIT_TMP_SEGMENT_LIFETIME_SECOND = 
60 * 60; // 1 Hour.
@@ -1199,6 +1202,11 @@ public class ControllerConf extends PinotConfiguration {
     return 
getProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, 
false);
   }
 
+  public int getUntrackedSegmentsRetentionTimeInDays() {
+    return 
getProperty(ControllerPeriodicTasksConf.UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS,
+        
ControllerPeriodicTasksConf.DEFAULT_UNTRACKED_SEGMENTS_RETENTION_TIME_IN_DAYS);
+  }
+
   public void setUntrackedSegmentDeletionEnabled(boolean 
untrackedSegmentDeletionEnabled) {
     setProperty(ControllerPeriodicTasksConf.ENABLE_UNTRACKED_SEGMENT_DELETION, 
untrackedSegmentDeletionEnabled);
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
index a87d8011529..5638ebc16ce 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java
@@ -197,7 +197,7 @@ public class SegmentDeletionManager {
       // TODO: If removing segments from deep store fails (e.g. controller 
crashes, deep store unavailable), these
       //       segments will become orphans and not easy to track because 
their ZK metadata are already deleted.
       //       Consider removing segments from deep store before cleaning up 
the ZK metadata.
-      removeSegmentsFromStore(tableName, segmentsToDelete, 
deletedSegmentsRetentionMs);
+      removeSegmentsFromStoreInBatch(tableName, segmentsToDelete, 
deletedSegmentsRetentionMs);
     }
 
     LOGGER.info("Deleted {} segments from table {}:{}", 
segmentsToDelete.size(), tableName,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
index 3f6c162b81a..a92dffff3f9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/RetentionManager.java
@@ -80,6 +80,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   public static final int DEFAULT_UNTRACKED_SEGMENTS_DELETION_BATCH_SIZE = 100;
   private static final RetryPolicy DEFAULT_RETRY_POLICY = 
RetryPolicies.randomDelayRetryPolicy(20, 100L, 200L);
   private final boolean _untrackedSegmentDeletionEnabled;
+  private final int _untrackedSegmentsRetentionTimeInDays;
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RetentionManager.class);
   private final boolean _isHybridTableRetentionStrategyEnabled;
@@ -92,6 +93,7 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
         config.getRetentionManagerInitialDelayInSeconds(), 
pinotHelixResourceManager, leadControllerManager,
         controllerMetrics);
     _untrackedSegmentDeletionEnabled = 
config.getUntrackedSegmentDeletionEnabled();
+    _untrackedSegmentsRetentionTimeInDays = 
config.getUntrackedSegmentsRetentionTimeInDays();
     _isHybridTableRetentionStrategyEnabled = 
config.isHybridTableRetentionStrategyEnabled();
     _brokerServiceHelper = brokerServiceHelper;
     LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", 
getIntervalInSeconds());
@@ -148,9 +150,13 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
       return;
     }
 
+    RetentionStrategy untrackedSegmentsRetentionStrategy =
+        createUntrackedSegmentsRetentionStrategy(validationConfig, 
tableNameWithType);
+
     // Scan all segment ZK metadata and purge segments if necessary
     if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) {
-      manageRetentionForOfflineTable(tableNameWithType, retentionStrategy, 
untrackedSegmentsDeletionBatchSize);
+      manageRetentionForOfflineTable(tableNameWithType, retentionStrategy, 
untrackedSegmentsDeletionBatchSize,
+          untrackedSegmentsRetentionStrategy);
     } else {
       String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
       TableConfig offlineTableConfig = 
_pinotHelixResourceManager.getOfflineTableConfig(rawTableName);
@@ -159,20 +165,21 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
         // TODO: handle the orphan segment deletion for hybrid table
         manageRetentionForHybridTable(tableConfig, offlineTableConfig);
       } else {
-        manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy, 
untrackedSegmentsDeletionBatchSize);
+        manageRetentionForRealtimeTable(tableNameWithType, retentionStrategy, 
untrackedSegmentsDeletionBatchSize,
+            untrackedSegmentsRetentionStrategy);
       }
     }
   }
 
   private void manageRetentionForOfflineTable(String offlineTableName, 
RetentionStrategy retentionStrategy,
-      int untrackedSegmentsDeletionBatchSize) {
+      int untrackedSegmentsDeletionBatchSize, RetentionStrategy 
untrackedSegmentsRetentionStrategy) {
     List<SegmentZKMetadata> segmentZKMetadataList = 
_pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
 
     // fetch those segments that are beyond the retention period and don't 
have an entry in ZK i.e.
     // SegmentZkMetadata is missing for those segments
     List<String> segmentsToDelete =
         getSegmentsToDeleteFromDeepstore(offlineTableName, retentionStrategy, 
segmentZKMetadataList,
-            untrackedSegmentsDeletionBatchSize);
+            untrackedSegmentsDeletionBatchSize, 
untrackedSegmentsRetentionStrategy);
 
     for (SegmentZKMetadata segmentZKMetadata : segmentZKMetadataList) {
       if (retentionStrategy.isPurgeable(offlineTableName, segmentZKMetadata)) {
@@ -186,14 +193,14 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   }
 
   private void manageRetentionForRealtimeTable(String realtimeTableName, 
RetentionStrategy retentionStrategy,
-      int untrackedSegmentsDeletionBatchSize) {
+      int untrackedSegmentsDeletionBatchSize, RetentionStrategy 
untrackedSegmentsRetentionStrategy) {
     List<SegmentZKMetadata> segmentZKMetadataList = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
 
     // fetch those segments that are beyond the retention period and don't 
have an entry in ZK i.e.
     // SegmentZkMetadata is missing for those segments
     List<String> segmentsToDelete =
         getSegmentsToDeleteFromDeepstore(realtimeTableName, retentionStrategy, 
segmentZKMetadataList,
-            untrackedSegmentsDeletionBatchSize);
+            untrackedSegmentsDeletionBatchSize, 
untrackedSegmentsRetentionStrategy);
 
     IdealState idealState = _pinotHelixResourceManager.getHelixAdmin()
         
.getResourceIdealState(_pinotHelixResourceManager.getHelixClusterName(), 
realtimeTableName);
@@ -300,7 +307,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
   }
 
   private List<String> getSegmentsToDeleteFromDeepstore(String 
tableNameWithType, RetentionStrategy retentionStrategy,
-      List<SegmentZKMetadata> segmentZKMetadataList, int 
untrackedSegmentsDeletionBatchSize) {
+      List<SegmentZKMetadata> segmentZKMetadataList, int 
untrackedSegmentsDeletionBatchSize,
+      RetentionStrategy untrackedSegmentsRetentionStrategy) {
     List<String> segmentsToDelete = new ArrayList<>();
     String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
     boolean isHybridTable = 
_pinotHelixResourceManager.hasOfflineTable(rawTableName)
@@ -348,7 +356,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
       LOGGER.info("Fetch segments present in deep store that are beyond 
retention period for table: {}",
           tableNameWithType);
       segmentsToDelete =
-          findUntrackedSegmentsToDeleteFromDeepstore(tableNameWithType, 
retentionStrategy, segmentsPresentInZK);
+          findUntrackedSegmentsToDeleteFromDeepstore(tableNameWithType, 
retentionStrategy, segmentsPresentInZK,
+              untrackedSegmentsRetentionStrategy);
       _controllerMetrics.setValueOfTableGauge(tableNameWithType, 
ControllerGauge.UNTRACKED_SEGMENTS_COUNT,
           segmentsToDelete.size());
 
@@ -380,7 +389,8 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
    * @throws IOException If there's an error accessing the filesystem
    */
   private List<String> findUntrackedSegmentsToDeleteFromDeepstore(String 
tableNameWithType,
-      RetentionStrategy retentionStrategy, List<String> segmentsToExclude)
+      RetentionStrategy retentionStrategy, List<String> segmentsToExclude,
+      RetentionStrategy untrackedSegmentsRetentionStrategy)
       throws IOException {
 
     List<String> segmentsToDelete = new ArrayList<>();
@@ -408,7 +418,11 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
       // determine whether the segment should be purged or not based on the 
last modified time of the file
       long lastModifiedTime = fileMetadata.getLastModifiedTime();
 
-      if (retentionStrategy.isPurgeable(tableNameWithType, segmentName, 
lastModifiedTime)) {
+      // the segment is either beyond the table retention or the retention set 
for untracked segments
+      boolean shouldDelete = retentionStrategy.isPurgeable(tableNameWithType, 
segmentName, lastModifiedTime)
+          || untrackedSegmentsRetentionStrategy.isPurgeable(tableNameWithType, 
segmentName, lastModifiedTime);
+
+      if (shouldDelete) {
         segmentsToDelete.add(segmentName);
       }
     }
@@ -483,4 +497,21 @@ public class RetentionManager extends 
ControllerPeriodicTask<Void> {
     }
     LOGGER.info("Segment lineage metadata clean-up is successfully processed 
for table: {}", tableNameWithType);
   }
+
+  private RetentionStrategy createUntrackedSegmentsRetentionStrategy(
+      SegmentsValidationAndRetentionConfig validationConfig, String 
tableNameWithType) {
+    if (validationConfig.getUntrackedSegmentsRetentionTimeUnit() != null
+        && validationConfig.getUntrackedSegmentsRetentionTimeValue() != null) {
+      try {
+        return new TimeRetentionStrategy(
+            
TimeUnit.valueOf(validationConfig.getUntrackedSegmentsRetentionTimeUnit().toUpperCase()),
+            
Long.parseLong(validationConfig.getUntrackedSegmentsRetentionTimeValue()));
+      } catch (Exception e) {
+        LOGGER.warn("Invalid untracked segments retention time: {} {} for 
table: {}, using default 3 days",
+            validationConfig.getUntrackedSegmentsRetentionTimeUnit(),
+            validationConfig.getUntrackedSegmentsRetentionTimeValue(), 
tableNameWithType, e);
+      }
+    }
+    return new TimeRetentionStrategy(TimeUnit.DAYS, 
_untrackedSegmentsRetentionTimeInDays);
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
index cf1a091409f..3dcac7b01ea 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java
@@ -580,6 +580,12 @@ public class SegmentDeletionManagerTest {
         @Nullable Long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
       _segmentsToRetry.addAll(segmentIds);
     }
+
+    @Override
+    public void removeSegmentsFromStoreInBatch(String tableNameWithType, 
List<String> segments,
+        @Nullable Long deletedSegmentsRetentionMs) {
+      _segmentsRemovedFromStore.addAll(segments);
+    }
   }
 
   public static class FakePinotFs extends LocalPinotFS {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 19ce8c33813..3871b8d6a9c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -269,14 +269,56 @@ public final class TableConfigUtils {
     }
 
     // Retention may not be specified. Ignore validation in that case.
-    String timeUnitString = segmentsConfig.getRetentionTimeUnit();
-    if (timeUnitString == null || timeUnitString.isEmpty()) {
-      return;
+    String retentionTimeUnitString = segmentsConfig.getRetentionTimeUnit();
+    if (retentionTimeUnitString != null && !retentionTimeUnitString.isEmpty()) 
{
+      try {
+        TimeUnit.valueOf(retentionTimeUnitString.toUpperCase());
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            String.format("Table: %s, invalid retention time unit: %s", 
tableName, retentionTimeUnitString));
+      }
     }
-    try {
-      TimeUnit.valueOf(timeUnitString.toUpperCase());
-    } catch (Exception e) {
-      throw new IllegalStateException(String.format("Table: %s, invalid time 
unit: %s", tableName, timeUnitString));
+
+    // Untracked segments retention may not be specified. Ignore validation in 
that case.
+    String untrackedSegmentsRetentionTimeUnitString = 
segmentsConfig.getUntrackedSegmentsRetentionTimeUnit();
+    String untrackedSegmentsRetentionTimeValueString = 
segmentsConfig.getUntrackedSegmentsRetentionTimeValue();
+
+    boolean hasUntrackedTimeUnit =
+        untrackedSegmentsRetentionTimeUnitString != null && 
!untrackedSegmentsRetentionTimeUnitString.isEmpty();
+    boolean hasUntrackedTimeValue =
+        untrackedSegmentsRetentionTimeValueString != null && 
!untrackedSegmentsRetentionTimeValueString.isEmpty();
+
+    if (hasUntrackedTimeUnit && !hasUntrackedTimeValue) {
+      throw new IllegalStateException(String.format(
+          "Table: %s, untracked retention time value must be specified when 
untracked retention time unit is provided",
+          tableName));
+    }
+    if (hasUntrackedTimeValue && !hasUntrackedTimeUnit) {
+      throw new IllegalStateException(String.format(
+          "Table: %s, untracked retention time unit must be specified when 
untracked retention time value is provided",
+          tableName));
+    }
+
+    if (hasUntrackedTimeUnit) {
+      try {
+        
TimeUnit.valueOf(untrackedSegmentsRetentionTimeUnitString.toUpperCase());
+      } catch (Exception e) {
+        throw new IllegalStateException(String.format("Table: %s, invalid 
untracked retention time unit: %s", tableName,
+            untrackedSegmentsRetentionTimeUnitString));
+      }
+
+      try {
+        long timeValue = 
Long.parseLong(untrackedSegmentsRetentionTimeValueString);
+        if (timeValue <= 0) {
+          throw new IllegalStateException(
+              String.format("Table: %s, untracked retention time value must be 
positive: %s", tableName,
+                  untrackedSegmentsRetentionTimeValueString));
+        }
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            String.format("Table: %s, invalid untracked retention time value: 
%s", tableName,
+                untrackedSegmentsRetentionTimeValueString));
+      }
     }
   }
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
index 536128b7c41..c5ba2f1e922 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/SegmentsValidationAndRetentionConfig.java
@@ -54,6 +54,8 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
   private String _peerSegmentDownloadScheme;
 
   private String _untrackedSegmentsDeletionBatchSize;
+  private String _untrackedSegmentsRetentionTimeUnit;
+  private String _untrackedSegmentsRetentionTimeValue;
 
   /**
    * @deprecated Use {@link InstanceAssignmentConfig} instead
@@ -245,4 +247,20 @@ public class SegmentsValidationAndRetentionConfig extends 
BaseJsonConfig {
   public void setUntrackedSegmentsDeletionBatchSize(String 
untrackedSegmentsDeletionBatchSize) {
     _untrackedSegmentsDeletionBatchSize = untrackedSegmentsDeletionBatchSize;
   }
+
+  public String getUntrackedSegmentsRetentionTimeUnit() {
+    return _untrackedSegmentsRetentionTimeUnit;
+  }
+
+  public void setUntrackedSegmentsRetentionTimeUnit(String 
untrackedSegmentsRetentionTimeUnit) {
+    _untrackedSegmentsRetentionTimeUnit = untrackedSegmentsRetentionTimeUnit;
+  }
+
+  public String getUntrackedSegmentsRetentionTimeValue() {
+    return _untrackedSegmentsRetentionTimeValue;
+  }
+
+  public void setUntrackedSegmentsRetentionTimeValue(String 
untrackedSegmentsRetentionTimeValue) {
+    _untrackedSegmentsRetentionTimeValue = untrackedSegmentsRetentionTimeValue;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to