This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-8342
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-8342 by this push:
     new 6542b31ff8a HDDS-12793. Recursive iterate FSO directory. (#9042)
6542b31ff8a is described below

commit 6542b31ff8a4b21f8faf34614f8c53463d828bc0
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Nov 4 12:16:54 2025 +0800

    HDDS-12793. Recursive iterate FSO directory. (#9042)
---
 .../common/src/main/resources/ozone-default.xml    |  10 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   3 +
 .../ozone/om/service/KeyLifecycleService.java      | 599 ++++++++++++++-------
 .../om/service/KeyLifecycleServiceMetrics.java     |  10 +
 .../ozone/om/service/TestKeyLifecycleService.java  | 140 ++++-
 5 files changed, 573 insertions(+), 189 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 3990f3b4898..7680fd62f12 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4716,5 +4716,13 @@
     <description>Number of workers executed of key lifecycle management 
service. This
       configuration should be set to greater than 0.</description>
   </property>
-
+  <property>
+    <name>ozone.lifecycle.service.delete.cached.directory.max-count</name>
+    <value>1000000</value>
+    <tag>OZONE</tag>
+    <description>Max numbers of directory objects held in memory stack for 
recursive FSO bucket evaluating for
+      a lifecycle evaluating task. Once the cached directory objects exceeds 
this limit, the evaluation of the involved
+      directories will abort.
+    </description>
+  </property>
 </configuration>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index c44a0cdf380..94e366beaed 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -195,6 +195,9 @@ private OMConfigKeys() {
   public static final String OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE =
       "ozone.lifecycle.service.delete.batch-size";
   public static final int 
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT = 1000;
+  public static final String 
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT =
+      "ozone.lifecycle.service.delete.cached.directory.max-count";
+  public static final long 
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT_DEFAULT = 1000000;
 
   /**
    * OM Ratis related configurations.
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
index 09b0c55efee..9e7112f1b31 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
@@ -22,6 +22,8 @@
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED_DEFAULT;
 import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
@@ -29,17 +31,24 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
+import jakarta.annotation.Nullable;
 import java.io.IOException;
 import java.nio.file.Paths;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -50,6 +59,8 @@
 import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.ClientVersion;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.KeyManager;
@@ -92,6 +103,7 @@ public class KeyLifecycleService extends BackgroundService {
   private final OzoneManager ozoneManager;
   private int keyDeleteBatchSize;
   private int listMaxSize;
+  private long cachedDirMaxCount;
   private final AtomicBoolean suspended;
   private KeyLifecycleServiceMetrics metrics;
   private boolean isServiceEnabled;
@@ -116,6 +128,8 @@ public KeyLifecycleService(OzoneManager ozoneManager,
     Preconditions.checkArgument(keyDeleteBatchSize > 0,
         OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE + " should be a positive 
value.");
     this.listMaxSize = keyDeleteBatchSize >= 10000 ? keyDeleteBatchSize : 
10000;
+    this.cachedDirMaxCount = 
conf.getLong(OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT,
+        OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT_DEFAULT);
     this.suspended = new AtomicBoolean(false);
     this.metrics = KeyLifecycleServiceMetrics.create();
     this.isServiceEnabled = 
conf.getBoolean(OZONE_KEY_LIFECYCLE_SERVICE_ENABLED,
@@ -226,6 +240,7 @@ public final class LifecycleActionTask implements 
BackgroundTask {
     private long sizeKeyDeleted = 0;
     private long numKeyRenamed = 0;
     private long sizeKeyRenamed = 0;
+    private long numDirRenamed = 0;
 
     public LifecycleActionTask(OmLifecycleConfiguration lcConfig) {
       this.policy = lcConfig;
@@ -316,17 +331,11 @@ public BackgroundTaskResult call() {
         // If trash is enabled, move files to trash, instead of send delete 
requests.
         // OBS bucket doesn't support trash.
         if (bucket.getBucketLayout() == OBJECT_STORE) {
-          sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(),
-              expiredKeyList, false);
-        } else if (ozoneTrash != null) {
-          // move keys to trash
-          // TODO: move directory to trash in next patch
-          moveKeysToTrash(bucket, expiredKeyList);
-        } else {
           sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), expiredKeyList, false);
-          if (!expiredDirList.isEmpty()) {
-            sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), expiredDirList, true);
-          }
+        } else {
+          // handle keys first, then directories
+          handleAndClearFullList(bucket, expiredKeyList, false);
+          handleAndClearFullList(bucket, expiredDirList, true);
         }
         onSuccess(bucketKey);
       }
@@ -342,21 +351,17 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
       List<OmLCRule> prefixStartsWithTrashRuleList =
           ruleList.stream().filter(r -> r.isPrefixEnable() && 
r.getEffectivePrefix().startsWith(
               TRASH_PREFIX + 
OzoneConsts.OM_KEY_PREFIX)).collect(Collectors.toList());
-      List<OmLCRule> directoryStylePrefixRuleList =
-          ruleList.stream().filter(r -> 
r.isDirectoryStylePrefix()).collect(Collectors.toList());
-      List<OmLCRule> nonDirectoryStylePrefixRuleList =
-          ruleList.stream().filter(r -> r.isPrefixEnable() && 
!r.isDirectoryStylePrefix()).collect(Collectors.toList());
+      List<OmLCRule> prefixRuleList =
+          ruleList.stream().filter(r -> 
r.isPrefixEnable()).collect(Collectors.toList());
       // r.isPrefixEnable() == false means empty filter
       List<OmLCRule> noPrefixRuleList =
           ruleList.stream().filter(r -> 
!r.isPrefixEnable()).collect(Collectors.toList());
 
-      directoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
-      nonDirectoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
+      prefixRuleList.removeAll(prefixStartsWithTrashRuleList);
       prefixStartsWithTrashRuleList.stream().forEach(
           r -> LOG.info("Skip rule {} as its prefix starts with {}", r, 
TRASH_PREFIX + OzoneConsts.OM_KEY_PREFIX));
 
-      Table<String, OmDirectoryInfo> directoryInfoTable = 
omMetadataManager.getDirectoryTable();
-      for (OmLCRule rule : directoryStylePrefixRuleList) {
+      for (OmLCRule rule : prefixRuleList) {
         // find KeyInfo of each directory for prefix
         List<OmDirectoryInfo> dirList;
         try {
@@ -366,171 +371,307 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
           // skip this rule if some directory doesn't exist for this rule's 
prefix
           continue;
         }
-        // use last directory's object ID to iterate the keys
-        String prefix = OM_KEY_PREFIX + volume.getObjectID() +
-            OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
-        StringBuffer directoryPath = new StringBuffer();
+        StringBuffer lastDirPath = new StringBuffer();
+        OmDirectoryInfo lastDir = null;
         if (!dirList.isEmpty()) {
-          OmDirectoryInfo lastDir = dirList.get(dirList.size() - 1);
-          prefix += lastDir.getObjectID();
-          for (OmDirectoryInfo dir : dirList) {
-            directoryPath.append(dir.getName()).append(OM_KEY_PREFIX);
+          lastDir = dirList.get(dirList.size() - 1);
+          for (int i = 0; i < dirList.size(); i++) {
+            lastDirPath.append(dirList.get(i).getName());
+            if (i != dirList.size() - 1) {
+              lastDirPath.append(OM_KEY_PREFIX);
+            }
+          }
+          if (lastDirPath.toString().startsWith(TRASH_PREFIX)) {
+            LOG.info("Skip evaluate trash directory {}", lastDirPath);
+          } else {
+            evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, 
lastDirPath.toString(), lastDir,
+                Arrays.asList(rule), expiredKeyList, expiredDirList);
           }
-          if ((directoryPath.toString().equals(rule.getEffectivePrefix()) ||
-              directoryPath.toString().equals(rule.getEffectivePrefix() + 
OM_KEY_PREFIX))
-              && rule.match(lastDir, directoryPath.toString())) {
-            if (expiredDirList.isFull()) {
-              // if expiredDirList is full, send delete/rename request for 
expired directories
-              handleAndClearFullList(bucket, expiredDirList, true);
+
+          if (!rule.getEffectivePrefix().endsWith(OM_KEY_PREFIX)) {
+            // if the prefix doesn't end with "/", then also search and 
evaluate the directory itself
+            // for example, "dir1/dir2" matches both directory "dir1/dir2" and 
"dir1/dir22"
+            // or "dir1" matches both directory "dir1" and "dir11"
+            long objID;
+            String objPrefix;
+            String objPath;
+            if (dirList.size() > 1) {
+              OmDirectoryInfo secondLastDir = dirList.get(dirList.size() - 2);
+              objID = secondLastDir.getObjectID();
+              objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX 
+ bucket.getObjectID() +
+                  OM_KEY_PREFIX + secondLastDir.getObjectID();
+              StringBuffer secondLastDirPath = new StringBuffer();
+              for (int i = 0; i < dirList.size() - 1; i++) {
+                secondLastDirPath.append(dirList.get(i).getName());
+                if (i != dirList.size() - 2) {
+                  secondLastDirPath.append(OM_KEY_PREFIX);
+                }
+              }
+              objPath = secondLastDirPath.toString();
+            } else {
+              objID = bucket.getObjectID();
+              objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX 
+ bucket.getObjectID() +
+                  OM_KEY_PREFIX + bucket.getObjectID();
+              objPath = "";
+            }
+            try {
+              SubDirectorySummary subDirSummary = getSubDirectory(objID, 
objPrefix, omMetadataManager);
+              for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
+                String subDirPath = objPath.isEmpty() ? subDir.getName() : 
objPath + OM_KEY_PREFIX + subDir.getName();
+                if (!subDir.getName().equals(TRASH_PREFIX) && 
subDirPath.startsWith(rule.getEffectivePrefix()) &&
+                    (lastDir == null || subDir.getObjectID() != 
lastDir.getObjectID())) {
+                  evaluateKeyAndDirTable(bucket, volume.getObjectID(), 
keyTable, subDirPath, subDir,
+                      Arrays.asList(rule), expiredKeyList, expiredDirList);
+                }
+              }
+            } catch (IOException e) {
+              // log failure and continue the process
+              LOG.warn("Failed to get sub directories of {} under {}/{}", 
objPrefix,
+                  bucket.getVolumeName(), bucket.getBucketName(), e);
+              return;
             }
-            expiredDirList.add(directoryPath.toString(), 0, 
lastDir.getUpdateID());
           }
+        } else {
+          evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
+              Arrays.asList(rule), expiredKeyList, expiredDirList);
         }
+      }
+
+      if (!noPrefixRuleList.isEmpty()) {
+        evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "", 
null,
+            noPrefixRuleList, expiredKeyList, expiredDirList);
+      }
+    }
 
-        LOG.info("Prefix {} for {}", prefix, bucketKey);
-        evaluateKeyTable(keyTable, prefix, directoryPath.toString(), rule, 
expiredKeyList, bucket);
-        evaluateDirTable(directoryInfoTable, prefix, directoryPath.toString(), 
rule,
-            expiredDirList, bucket);
+    @SuppressWarnings({"checkstyle:parameternumber", 
"checkstyle:MethodLength"})
+    private void evaluateKeyAndDirTable(OmBucketInfo bucket, long volumeObjId, 
Table<String, OmKeyInfo> keyTable,
+        String directoryPath, @Nullable OmDirectoryInfo dir, List<OmLCRule> 
ruleList, LimitedExpiredObjectList keyList,
+        LimitedExpiredObjectList dirList) {
+      String volumeName = bucket.getVolumeName();
+      String bucketName = bucket.getBucketName();
+      LimitedSizeStack stack = new LimitedSizeStack(cachedDirMaxCount);
+      try {
+        if (dir != null) {
+          stack.push(new PendingEvaluateDirectory(dir, directoryPath, null));
+        } else {
+          // put a placeholder PendingEvaluateDirectory to stack for bucket
+          stack.push(new PendingEvaluateDirectory(null, "", null));
+        }
+      } catch (CapacityFullException e) {
+        LOG.warn("Abort evaluate {}/{} at {}", volumeName, bucketName, 
directoryPath != null ? directoryPath : "", e);
+        return;
       }
 
-      for (OmLCRule rule : nonDirectoryStylePrefixRuleList) {
-        // find the directory for the prefix, it may not exist
-        OmDirectoryInfo dirInfo = getDirectory(volume, bucket, 
rule.getEffectivePrefix(), bucketKey);
-        String prefix = OM_KEY_PREFIX + volume.getObjectID() +
-            OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
-        if (dirInfo != null) {
-          if (!dirInfo.getName().equals(TRASH_PREFIX)) {
-            prefix += dirInfo.getObjectID();
-            if (dirInfo.getName().equals(rule.getEffectivePrefix()) && 
rule.match(dirInfo, dirInfo.getName())) {
-              if (expiredDirList.isFull()) {
-                // if expiredDirList is full, send delete/rename request for 
expired directories
-                handleAndClearFullList(bucket, expiredDirList, true);
+      HashSet<Long> deletedDirSet = new HashSet<>();
+      while (!stack.isEmpty()) {
+        PendingEvaluateDirectory item = stack.pop();
+        OmDirectoryInfo currentDir = item.getDirectoryInfo();
+        String currentDirPath = item.getDirPath();
+        long currentDirObjID = currentDir == null ? bucket.getObjectID() : 
currentDir.getObjectID();
+
+        // use current directory's object ID to iterate the keys and 
directories under it
+        String prefix =
+            OM_KEY_PREFIX + volumeObjId + OM_KEY_PREFIX + bucket.getObjectID() 
+ OM_KEY_PREFIX + currentDirObjID;
+        LOG.debug("Prefix {} for {}/{}", prefix, bucket.getVolumeName(), 
bucket.getBucketName());
+
+        // get direct sub directories
+        SubDirectorySummary subDirSummary = item.getSubDirSummary();
+        boolean newSubDirPushed = false;
+        long deletedDirCount = 0;
+        if (subDirSummary == null) {
+          try {
+            subDirSummary = getSubDirectory(currentDirObjID, prefix, 
omMetadataManager);
+          } catch (IOException e) {
+            // log failure, continue to process other directories in stack
+            LOG.warn("Failed to get sub directories of {} under {}/{}", 
currentDirPath, volumeName, bucketName, e);
+            continue;
+          }
+
+          // filter sub directory list
+          if (!subDirSummary.getSubDirList().isEmpty()) {
+            Iterator<OmDirectoryInfo> iterator = 
subDirSummary.getSubDirList().iterator();
+            while (iterator.hasNext()) {
+              OmDirectoryInfo subDir = iterator.next();
+              String subDirPath = currentDirPath.isEmpty() ? subDir.getName() :
+                  currentDirPath + OM_KEY_PREFIX + subDir.getName();
+              if (subDirPath.startsWith(TRASH_PREFIX)) {
+                iterator.remove();
+              }
+              boolean matched = false;
+              for (OmLCRule rule : ruleList) {
+                if (rule.getEffectivePrefix() != null && 
subDirPath.startsWith(rule.getEffectivePrefix())) {
+                  matched = true;
+                  break;
+                }
+              }
+              if (!matched) {
+                iterator.remove();
+              }
+            }
+          }
+
+          if (!subDirSummary.getSubDirList().isEmpty()) {
+            item.setSubDirSummary(subDirSummary);
+            try {
+              stack.push(item);
+            } catch (CapacityFullException e) {
+              LOG.warn("Abort evaluate {}/{} at {}", volumeName, bucketName, 
currentDirPath, e);
+              return;
+            }
+
+            // depth first evaluation, push subDirs into stack
+            for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
+              String subDirPath = currentDirPath.isEmpty() ? subDir.getName() :
+                  currentDirPath + OM_KEY_PREFIX + subDir.getName();
+              try {
+                stack.push(new PendingEvaluateDirectory(subDir, subDirPath, 
null));
+              } catch (CapacityFullException e) {
+                LOG.warn("Abort evaluate {}/{} at {}", volumeName, bucketName, 
subDirPath, e);
+                return;
+              }
+            }
+            newSubDirPushed = true;
+          }
+        } else {
+          // this item is a parent directory, check how many sub directories 
are deleted.
+          for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
+            if (deletedDirSet.remove(subDir.getObjectID())) {
+              deletedDirCount++;
+            }
+          }
+        }
+
+        if (newSubDirPushed) {
+          continue;
+        }
+
+        // evaluate direct files, first check cache, then check table
+        // there are three cases:
+        // a. key is deleted in cache, while it's not deleted in table yet
+        // b. key is new added in cache, not in table yet
+        // c. key is updated in cache(rename), but not updated in table yet
+        //    in this case, the fromKey is a deleted key in cache, and the 
toKey is a newly added key in cache,
+        //    and fromKey is also in table
+        long numKeysUnderDir = 0;
+        long numKeysExpired = 0;
+        HashSet<String> deletedKeySetInCache = new HashSet();
+        HashSet<String> keySetInCache = new HashSet();
+        Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> cacheIter 
= keyTable.cacheIterator();
+        while (cacheIter.hasNext()) {
+          Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>> entry = 
cacheIter.next();
+          OmKeyInfo key = entry.getValue().getCacheValue();
+          if (key == null) {
+            deletedKeySetInCache.add(entry.getKey().getCacheKey());
+            continue;
+          }
+          if (key.getParentObjectID() == currentDirObjID) {
+            numKeysUnderDir++;
+            keySetInCache.add(entry.getKey().getCacheKey());
+            String keyPath = currentDirPath.isEmpty() ? key.getKeyName() :
+                currentDirPath + OM_KEY_PREFIX + key.getKeyName();
+            for (OmLCRule rule : ruleList) {
+              if (rule.match(key, keyPath)) {
+                // mark key as expired, check next key
+                if (keyList.isFull()) {
+                  // if keyList is full, send delete/rename request for 
expired keys
+                  handleAndClearFullList(bucket, keyList, false);
+                }
+                keyList.add(keyPath, key.getReplicatedSize(), 
key.getUpdateID());
+                numKeysExpired++;
+                break;
               }
-              expiredDirList.add(dirInfo.getName(), 0, dirInfo.getUpdateID());
             }
-          } else {
-            dirInfo = null;
-            LOG.info("Skip evaluate trash directory {}", TRASH_PREFIX);
           }
         }
-        LOG.info("Prefix {} for {}", prefix, bucketKey);
-        evaluateKeyTable(keyTable, prefix, dirInfo == null ? "" : 
dirInfo.getName() + OzoneConsts.OM_KEY_PREFIX,
-            rule, expiredKeyList, bucket);
-        evaluateDirTable(directoryInfoTable, prefix,
-            dirInfo == null ? "" : dirInfo.getName() + 
OzoneConsts.OM_KEY_PREFIX, rule, expiredDirList, bucket);
-      }
 
-      if (!noPrefixRuleList.isEmpty()) {
-        String prefix = OM_KEY_PREFIX + volume.getObjectID() +
-            OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
-        LOG.info("prefix {} for {}", prefix, bucketKey);
-        // use bucket name as key iterator prefix
         try (TableIterator<String, ? extends Table.KeyValue<String, 
OmKeyInfo>> keyTblItr =
                  keyTable.iterator(prefix)) {
           while (keyTblItr.hasNext()) {
             Table.KeyValue<String, OmKeyInfo> keyValue = keyTblItr.next();
             OmKeyInfo key = keyValue.getValue();
+            String keyPath = currentDirPath.isEmpty() ? key.getKeyName() :
+                currentDirPath + OM_KEY_PREFIX + key.getKeyName();
             numKeyIterated++;
-            for (OmLCRule rule : noPrefixRuleList) {
-              if (rule.match(key)) {
+            if (deletedKeySetInCache.remove(keyValue.getKey()) || 
keySetInCache.remove(keyValue.getKey())) {
+              continue;
+            }
+            numKeysUnderDir++;
+            for (OmLCRule rule : ruleList) {
+              if (key.getParentObjectID() == currentDirObjID && 
rule.match(key, keyPath)) {
                 // mark key as expired, check next key
-                if (expiredKeyList.isFull()) {
-                  // if expiredKeyList is full, send delete/rename request for 
expired keys
-                  handleAndClearFullList(bucket, expiredKeyList, false);
+                if (keyList.isFull()) {
+                  // if keyList is full, send delete request for pending 
deletion keys
+                  handleAndClearFullList(bucket, keyList, false);
                 }
-                expiredKeyList.add(key.getKeyName(), key.getReplicatedSize(), 
key.getUpdateID());
-                break;
+                keyList.add(keyPath, key.getReplicatedSize(), 
key.getUpdateID());
+                numKeysExpired++;
               }
             }
           }
         } catch (IOException e) {
-          // log failure and continue the process to delete/move files already 
identified in this run
-          LOG.warn("Failed to iterate keyTable for bucket {}", bucketKey, e);
+          // log failure and continue the process other directories in stack
+          LOG.warn("Failed to iterate keyTable for bucket {}/{}", volumeName, 
bucketName, e);
+          continue;
         }
 
-        try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>> dirTblItr =
-                 directoryInfoTable.iterator(prefix)) {
-          while (dirTblItr.hasNext()) {
-            Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
-            OmDirectoryInfo dir = entry.getValue();
-            numDirIterated++;
-            // skip TRASH_PREFIX directory
-            if (dir.getName().equals(TRASH_PREFIX)) {
-              continue;
-            }
-            for (OmLCRule rule : noPrefixRuleList) {
-              if (rule.match(dir, dir.getPath())) {
-                // mark directory as expired, check next directory
-                if (expiredDirList.isFull()) {
-                  // if expiredDirList is full, send delete/rename request for 
expired directories
-                  handleAndClearFullList(bucket, expiredDirList, true);
-                }
-                expiredDirList.add(dir.getPath(), 0, dir.getUpdateID());
-                break;
+        // if this directory is empty or all files/subDirs are expired, 
evaluate itself
+        if ((numKeysUnderDir == 0 && subDirSummary.getSubDirCount() == 0) ||
+            (numKeysUnderDir == numKeysExpired && deletedDirCount == 
subDirSummary.getSubDirCount())) {
+          for (OmLCRule rule : ruleList) {
+            String path = (rule.getEffectivePrefix() != null && 
rule.getEffectivePrefix().endsWith(OM_KEY_PREFIX)) ?
+                currentDirPath + OM_KEY_PREFIX : currentDirPath;
+            if (currentDir != null && rule.match(currentDir, path)) {
+              if (dirList.isFull()) {
+                // if expiredDirList is full, send delete request for pending 
deletion directories
+                handleAndClearFullList(bucket, dirList, true);
               }
+              dirList.add(currentDirPath, 0, currentDir.getUpdateID());
+              deletedDirSet.add(currentDir.getObjectID());
             }
           }
-        } catch (IOException e) {
-          // log failure and continue the process to delete/move directories 
already identified in this run
-          LOG.warn("Failed to iterate directoryTable for bucket {}", 
bucketKey, e);
         }
       }
     }
 
-    private void evaluateKeyTable(Table<String, OmKeyInfo> keyTable, String 
prefix, String directoryPath,
-        OmLCRule rule, LimitedExpiredObjectList keyList, OmBucketInfo bucket) {
-      String volumeName = bucket.getVolumeName();
-      String bucketName = bucket.getBucketName();
-      try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyTblItr =
-               keyTable.iterator(prefix)) {
-        while (keyTblItr.hasNext()) {
-          Table.KeyValue<String, OmKeyInfo> keyValue = keyTblItr.next();
-          OmKeyInfo key = keyValue.getValue();
-          String keyPath = directoryPath + key.getKeyName();
-          numKeyIterated++;
-          if (rule.match(key, keyPath)) {
-            // mark key as expired, check next key
-            if (keyList.isFull()) {
-              // if keyList is full, send delete/rename request for expired 
keys
-              handleAndClearFullList(bucket, keyList, false);
-            }
-            keyList.add(keyPath, key.getReplicatedSize(), key.getUpdateID());
-          }
+    private SubDirectorySummary getSubDirectory(long dirObjID, String prefix, 
OMMetadataManager metaMgr)
+        throws IOException {
+      SubDirectorySummary subDirList = new SubDirectorySummary();
+
+      // Check all dirTable cache for any sub paths.
+      Table dirTable = metaMgr.getDirectoryTable();
+      Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
+          cacheIter = dirTable.cacheIterator();
+      HashSet<String> deletedDirSet = new HashSet();
+      while (cacheIter.hasNext()) {
+        Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>> entry =
+            cacheIter.next();
+        numDirIterated++;
+        OmDirectoryInfo cacheOmDirInfo = entry.getValue().getCacheValue();
+        if (cacheOmDirInfo == null) {
+          deletedDirSet.add(entry.getKey().getCacheKey());
+          continue;
+        }
+        if (cacheOmDirInfo.getParentObjectID() == dirObjID) {
+          subDirList.addSubDir(cacheOmDirInfo);
         }
-      } catch (IOException e) {
-        // log failure and continue the process to delete/move files already 
identified in this run
-        LOG.warn("Failed to iterate keyTable for bucket {}/{}", volumeName, 
bucketName, e);
       }
-    }
 
-    private void evaluateDirTable(Table<String, OmDirectoryInfo> 
directoryInfoTable, String prefix,
-        String directoryPath, OmLCRule rule, LimitedExpiredObjectList dirList, 
OmBucketInfo bucket) {
-      String volumeName = bucket.getVolumeName();
-      String bucketName = bucket.getBucketName();
-      try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>> dirTblItr =
-               directoryInfoTable.iterator(prefix)) {
-        while (dirTblItr.hasNext()) {
-          Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
-          OmDirectoryInfo dir = entry.getValue();
+      // Check dirTable entries for any sub paths.
+      try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>>
+               iterator = dirTable.iterator(prefix)) {
+        while (iterator.hasNext()) {
           numDirIterated++;
-          // skip TRASH_PREFIX directory
-          if (dir.getName().equals(TRASH_PREFIX)) {
+          Table.KeyValue<String, OmDirectoryInfo> entry = iterator.next();
+          OmDirectoryInfo dir = entry.getValue();
+          if (deletedDirSet.contains(entry.getKey())) {
             continue;
           }
-          String dirPath = directoryPath + dir.getName();
-          if (rule.match(dir, dirPath)) {
-            // mark dir as expired, check next key
-            if (dirList.isFull()) {
-              // if dirList is full, send delete/rename request for expired 
directories
-              handleAndClearFullList(bucket, dirList, true);
-            }
-            dirList.add(dirPath, 0, dir.getUpdateID());
+          if (dir.getParentObjectID() == dirObjID) {
+            subDirList.addSubDir(dir);
           }
         }
-      } catch (IOException e) {
-        // log failure and continue the process to delete/move files already 
identified in this run
-        LOG.warn("Failed to iterate directoryTable for bucket {}/{}", 
volumeName, bucketName, e);
       }
+      return subDirList;
     }
 
     private void evaluateBucket(OmBucketInfo bucketInfo,
@@ -577,17 +718,6 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
       }
     }
 
-    private OmDirectoryInfo getDirectory(OmVolumeArgs volume, OmBucketInfo 
bucket, String prefix, String bucketKey) {
-      String dbDirName = omMetadataManager.getOzonePathKey(
-          volume.getObjectID(), bucket.getObjectID(), bucket.getObjectID(), 
prefix);
-      try {
-        return omMetadataManager.getDirectoryTable().get(dbDirName);
-      } catch (IOException e) {
-        LOG.info("Failed to get directory object of {} for bucket {}", 
dbDirName, bucketKey);
-        return null;
-      }
-    }
-
     /**
      * If prefix is /dir1/dir2, but dir1 doesn't exist, then it will return 
exception.
      * If prefix is /dir1/dir2, but dir2 doesn't exist, then it will return a 
list with dir1 only.
@@ -641,13 +771,13 @@ private void onSuccess(String bucketName) {
       metrics.incNumKeyIterated(numKeyIterated);
       metrics.incNumDirIterated(numDirIterated);
       LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs, 
deleted {} keys with {} bytes, " +
-          "and {} dirs, renamed {} keys with {} bytes to trash", timeSpent, 
bucketName, numKeyIterated, numDirIterated,
-          numKeyDeleted, sizeKeyDeleted, numDirDeleted, numKeyRenamed, 
sizeKeyRenamed);
+          "and {} dirs, renamed {} keys with {} bytes, and {} dirs to trash", 
timeSpent, bucketName, numKeyIterated,
+          numDirIterated, numKeyDeleted, sizeKeyDeleted, numDirDeleted, 
numKeyRenamed, sizeKeyRenamed, numDirRenamed);
     }
 
     private void handleAndClearFullList(OmBucketInfo bucket, 
LimitedExpiredObjectList keysList, boolean dir) {
       if (bucket.getBucketLayout() != OBJECT_STORE && ozoneTrash != null) {
-        moveKeysToTrash(bucket, keysList);
+        moveToTrash(bucket, keysList, dir);
       } else {
         sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), keysList, dir);
       }
@@ -748,16 +878,16 @@ private void sendDeleteKeysRequestAndClearList(String 
volume, String bucket,
       }
     }
 
-    private void moveKeysToTrash(OmBucketInfo bucket, LimitedExpiredObjectList 
keysList) {
+    private void moveToTrash(OmBucketInfo bucket, LimitedExpiredObjectList 
keysList, boolean isDir) {
       if (keysList.isEmpty()) {
         return;
       }
       String volumeName = bucket.getVolumeName();
       String bucketName = bucket.getBucketName();
-      String trashCurrent;
-      UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(bucket.getOwner());
+      String trashRoot = TRASH_PREFIX + OM_KEY_PREFIX + bucket.getOwner();
+      Path trashCurrent = new Path(trashRoot, CURRENT);
       try {
-        trashCurrent = checkAndCreateTrashDirectoryIfNeeded(bucket, ugi);
+        checkAndCreateTrashDirIfNeeded(bucket, trashCurrent);
       } catch (IOException e) {
         keysList.clear();
         return;
@@ -765,6 +895,15 @@ private void moveKeysToTrash(OmBucketInfo bucket, 
LimitedExpiredObjectList keysL
 
       for (int i = 0; i < keysList.size(); i++) {
         String keyName = keysList.getName(i);
+        Path keyPath = new Path(OzoneConsts.OZONE_URI_DELIMITER + keyName);
+        Path baseKeyTrashPath = Path.mergePaths(trashCurrent, 
keyPath.getParent());
+        try {
+          checkAndCreateTrashDirIfNeeded(bucket, baseKeyTrashPath);
+        } catch (IOException e) {
+          LOG.error("Failed to check and create Trash dir {} for bucket 
{}/{}", baseKeyTrashPath,
+              volumeName, bucketName, e);
+          continue;
+        }
         String targetKeyName = trashCurrent + OM_KEY_PREFIX + keyName;
         KeyArgs keyArgs = KeyArgs.newBuilder().setKeyName(keyName)
             .setVolumeName(volumeName).setBucketName(bucketName).build();
@@ -790,6 +929,7 @@ private void moveKeysToTrash(OmBucketInfo bucket, 
LimitedExpiredObjectList keysL
         try {
           // perform preExecute as ratis submit do no perform preExecute
           OMClientRequest omClientRequest = 
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+          UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(bucket.getOwner());
           OzoneManagerProtocolProtos.OMResponse omResponse =
               ugi.doAs(new 
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
                 @Override
@@ -806,35 +946,37 @@ public OzoneManagerProtocolProtos.OMResponse run() throws 
Exception {
               continue;
             }
           }
-          LOG.debug("RenameKey request succeed with source key: {}, dest key: 
{}", keyName, targetKeyName);
-          numKeyRenamed += 1;
-          sizeKeyRenamed += keysList.getReplicatedSize(i);
-          metrics.incrNumKeyRenamed(1);
-          metrics.incrSizeKeyRenamed(keysList.getReplicatedSize(i));
-        } catch (InterruptedException | IOException e) {
+          LOG.info("RenameKey request succeed with source key: {}, dest key: 
{}", keyName, targetKeyName);
+
+          if (isDir) {
+            numDirRenamed += 1;
+            metrics.incrNumDirRenamed(1);
+          } else {
+            numKeyRenamed += 1;
+            sizeKeyRenamed += keysList.getReplicatedSize(i);
+            metrics.incrNumKeyRenamed(1);
+            metrics.incrSizeKeyRenamed(keysList.getReplicatedSize(i));
+          }
+        } catch (IOException | InterruptedException e) {
           LOG.error("Failed to send RenameKeysRequest", e);
         }
       }
       keysList.clear();
     }
 
-    private String checkAndCreateTrashDirectoryIfNeeded(OmBucketInfo bucket, 
UserGroupInformation ugi)
-        throws IOException {
-      String userTrashRoot = TRASH_PREFIX + OM_KEY_PREFIX + bucket.getOwner();
-      String userTrashCurrent = userTrashRoot + OM_KEY_PREFIX + CURRENT;
+    private void checkAndCreateTrashDirIfNeeded(OmBucketInfo bucket, Path 
dirPath) throws IOException {
+      OmKeyArgs key = new 
OmKeyArgs.Builder().setVolumeName(bucket.getVolumeName())
+          .setBucketName(bucket.getBucketName()).setKeyName(dirPath.toString())
+          .setOwnerName(bucket.getOwner()).build();
       try {
-        OmKeyArgs key = new 
OmKeyArgs.Builder().setVolumeName(bucket.getVolumeName())
-            .setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
-            .setOwnerName(bucket.getOwner()).build();
         ozoneManager.getFileStatus(key);
-        return userTrashCurrent;
       } catch (IOException e) {
         if (e instanceof OMException &&
             (((OMException) e).getResult() == 
OMException.ResultCodes.FILE_NOT_FOUND ||
                 ((OMException) e).getResult() == 
OMException.ResultCodes.DIRECTORY_NOT_FOUND)) {
           // create the trash/Current directory for user
           KeyArgs keyArgs = 
KeyArgs.newBuilder().setVolumeName(bucket.getVolumeName())
-              
.setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
+              
.setBucketName(bucket.getBucketName()).setKeyName(dirPath.toString())
               .setOwnerName(bucket.getOwner()).setRecursive(true).build();
           OMRequest omRequest = 
OMRequest.newBuilder().setCreateDirectoryRequest(
                   CreateDirectoryRequest.newBuilder().setKeyArgs(keyArgs))
@@ -845,6 +987,7 @@ private String 
checkAndCreateTrashDirectoryIfNeeded(OmBucketInfo bucket, UserGro
           try {
             // perform preExecute as ratis submit do no perform preExecute
             final OMClientRequest omClientRequest = 
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+            UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(bucket.getOwner());
             OzoneManagerProtocolProtos.OMResponse omResponse =
                 ugi.doAs(new 
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
                   @Override
@@ -858,19 +1001,19 @@ public OzoneManagerProtocolProtos.OMResponse run() 
throws Exception {
             if (omResponse != null) {
               if (!omResponse.getSuccess()) {
                 LOG.error("CreateDirectory request failed with {}, path: {}",
-                    omResponse.getMessage(), userTrashCurrent);
-                throw new IOException("Failed to create trash directory " + 
userTrashCurrent);
+                    omResponse.getMessage(), dirPath);
+                throw new IOException("Failed to create trash directory " + 
dirPath);
               }
-              LOG.debug("Created trash current directory: {}", 
userTrashCurrent);
-              return userTrashCurrent;
             }
+            LOG.info("Created directory {}/{}/{}", bucket.getVolumeName(), 
bucket.getBucketName(), dirPath);
           } catch (InterruptedException | IOException e1) {
-            LOG.error("Failed to send CreateDirectoryRequest for {}", 
userTrashCurrent, e1);
-            throw new IOException("Failed to send CreateDirectoryRequest 
request for " + userTrashCurrent);
+            LOG.error("Failed to send CreateDirectoryRequest for {}", dirPath, 
e1);
+            throw new IOException("Failed to send CreateDirectoryRequest 
request for " + dirPath);
           }
+        } else {
+          LOG.error("Failed to get trash current directory {} status", 
dirPath, e);
+          throw e;
         }
-        LOG.error("Failed to get trash current directory {} status", 
userTrashCurrent, e);
-        throw e;
       }
     }
   }
@@ -1020,4 +1163,98 @@ public void clear() {
       internalList.clear();
     }
   }
+
+  /**
+   * An in-memory class to hold the information required in directory 
recursive evaluation.
+   */
+  public static class PendingEvaluateDirectory {
+    private final OmDirectoryInfo directoryInfo;
+    private final String dirPath;
+    private SubDirectorySummary subDirSummary;
+
+    public PendingEvaluateDirectory(OmDirectoryInfo dir, String path, 
SubDirectorySummary summary) {
+      this.directoryInfo = dir;
+      this.dirPath = path;
+      this.subDirSummary = summary;
+    }
+
+    public String getDirPath() {
+      return dirPath;
+    }
+
+    public OmDirectoryInfo getDirectoryInfo() {
+      return directoryInfo;
+    }
+
+    public SubDirectorySummary getSubDirSummary() {
+      return subDirSummary;
+    }
+
+    public void setSubDirSummary(SubDirectorySummary summary) {
+      subDirSummary = summary;
+    }
+  }
+
+  /**
+   * An in-memory class to hold sub directory summary.
+   */
+  public static class SubDirectorySummary {
+    private final List<OmDirectoryInfo> subDirList;
+    private long subDirCount;
+
+    public SubDirectorySummary() {
+      this.subDirList = new ArrayList<>();
+      this.subDirCount = 0;
+    }
+
+    public long getSubDirCount() {
+      return subDirCount;
+    }
+
+    public List<OmDirectoryInfo> getSubDirList() {
+      return subDirList;
+    }
+
+    public void addSubDir(OmDirectoryInfo dir) {
+      subDirList.add(dir);
+      subDirCount++;
+    }
+  }
+
+  /**
+   * An in-memory stack with a maximum size. This class is not thread safe.
+   */
+  public static class LimitedSizeStack {
+    private final Deque<PendingEvaluateDirectory> stack;
+    private final long maxSize;
+
+    public LimitedSizeStack(long maxSize) {
+      this.maxSize = maxSize;
+      this.stack = new ArrayDeque<>();
+    }
+
+    public boolean isEmpty() {
+      return stack.isEmpty();
+    }
+
+    public void push(PendingEvaluateDirectory e) throws CapacityFullException {
+      if (stack.size() >= maxSize) {
+        throw new CapacityFullException("LimitedSizeStack has reached maximum 
size " + maxSize);
+      }
+      stack.push(e);
+    }
+
+    public PendingEvaluateDirectory pop() {
+      return stack.pop();
+    }
+  }
+
+  /**
+   * An exception which indicates the collection is full.
+   */
+  public static class CapacityFullException extends Exception {
+    public CapacityFullException(String message) {
+      super(message);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
index 77efe4f2287..874b133777b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
@@ -81,6 +81,8 @@ public static void unregister() {
   private MutableGaugeLong numKeyDeleted;
   @Metric("Total keys renamed")
   private MutableGaugeLong numKeyRenamed;
+  @Metric("Total directories renamed")
+  private MutableGaugeLong numDirRenamed;
   @Metric("Total size of keys deleted")
   private MutableGaugeLong sizeKeyDeleted;
   @Metric("Total size of keys renamed")
@@ -110,6 +112,10 @@ public void incrNumKeyRenamed(long keyCount) {
     numKeyRenamed.incr(keyCount);
   }
 
+  public void incrNumDirRenamed(long dirCount) {
+    numDirRenamed.incr(dirCount);
+  }
+
   public void incrSizeKeyDeleted(long size) {
     sizeKeyDeleted.incr(size);
   }
@@ -130,6 +136,10 @@ public MutableGaugeLong getNumKeyRenamed() {
     return numKeyRenamed;
   }
 
+  public MutableGaugeLong getNumDirRenamed() {
+    return numDirRenamed;
+  }
+
   public MutableGaugeLong getSizeKeyDeleted() {
     return sizeKeyDeleted;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
index 4a665a28ad0..cb54173266b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
@@ -27,12 +27,14 @@
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OmConfig.Keys.ENABLE_FILESYSTEM_PATHS;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
 import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
 import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -47,6 +49,7 @@
 import java.security.PrivilegedExceptionAction;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -171,6 +174,7 @@ private void createConfig(File testDir) {
     conf.setTimeDuration(OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL, 
SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
     conf.setInt(OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE, 50);
     conf.setQuietMode(false);
+    conf.setBoolean(ENABLE_FILESYSTEM_PATHS, false);
     OmLCExpiration.setTest(true);
   }
 
@@ -1237,8 +1241,9 @@ void testPerformanceWithNestedDir(BucketLayout 
bucketLayout, String prefix)
       GenericTestUtils.waitFor(() -> metrics.getNumKeyDeleted().value() - 
initialKeyDeleted == keyCount,
           WAIT_CHECK_INTERVAL, 5000);
       assertEquals(0, metrics.getNumDirIterated().value() - 
initialDirIterated);
-      assertEquals(bucketLayout == FILE_SYSTEM_OPTIMIZED ? 1 : 0,
-          metrics.getNumDirDeleted().value() - initialDirDeleted);
+      GenericTestUtils.waitFor(() ->
+              metrics.getNumDirDeleted().value() - initialDirDeleted == 
(bucketLayout == FILE_SYSTEM_OPTIMIZED ? 1 : 0),
+          WAIT_CHECK_INTERVAL, 10000);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
@@ -1308,16 +1313,24 @@ void testListMaxSize(BucketLayout bucketLayout, boolean 
enableTrash) throws IOEx
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
+    public Stream<Arguments> parameters7() {
+      return Stream.of(
+          arguments("FILE_SYSTEM_OPTIMIZED", "key"),
+          arguments("FILE_SYSTEM_OPTIMIZED", "dir/key"),
+          arguments("LEGACY", "key"),
+          arguments("LEGACY", "dir/key"));
+    }
+
     @ParameterizedTest
-    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "LEGACY"})
-    void testMoveToTrash(BucketLayout bucketLayout) throws IOException,
+    @MethodSource("parameters7")
+    void testMoveToTrash(BucketLayout bucketLayout, String prefix) throws 
IOException,
         TimeoutException, InterruptedException {
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
-      String prefix = "key";
       long initialDeletedKeyCount = getDeletedKeyCount();
       long initialKeyCount = getKeyCount(bucketLayout);
       long initialRenamedKeyCount = metrics.getNumKeyRenamed().value();
+      long initialRenamedDirCount = metrics.getNumDirRenamed().value();
       // create keys
       String bucketOwner = 
UserGroupInformation.getCurrentUser().getShortUserName() + "-test";
       List<OmKeyArgs> keyList =
@@ -1344,6 +1357,12 @@ void testMoveToTrash(BucketLayout bucketLayout) throws 
IOException,
       GenericTestUtils.waitFor(() ->
           (metrics.getNumKeyRenamed().value() - initialRenamedKeyCount) == 
KEY_COUNT, WAIT_CHECK_INTERVAL, 50000);
       assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
+        // Legacy bucket doesn't have dir concept
+        GenericTestUtils.waitFor(() ->
+                metrics.getNumDirRenamed().value() - initialRenamedDirCount == 
(prefix.contains(OM_KEY_PREFIX) ? 1 : 0),
+            WAIT_CHECK_INTERVAL, 5000);
+      }
       deleteLifecyclePolicy(volumeName, bucketName);
       // verify trash directory has the right native ACLs
       List<KeyInfoWithVolumeContext> dirList = new ArrayList<>();
@@ -1378,7 +1397,7 @@ void testMoveToTrash(BucketLayout bucketLayout) throws 
IOException,
         assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
       } else {
         // For legacy bucket, trash directories along .Trash/user-test/Current 
are in key table too.
-        assertEquals(KEY_COUNT + 3, getKeyCount(bucketLayout) - 
initialKeyCount);
+        assertEquals(KEY_COUNT + (prefix.contains(OM_KEY_PREFIX) ? 4 : 3), 
getKeyCount(bucketLayout) - initialKeyCount);
       }
       // create new policy to test rule with prefix ".Trash/" is ignored 
during lifecycle evaluation
       now = ZonedDateTime.now(ZoneOffset.UTC);
@@ -1401,6 +1420,15 @@ void testMoveToTrash(BucketLayout bucketLayout) throws 
IOException,
           () -> log.getOutput().contains("Skip evaluate trash directory " + 
TRASH_PREFIX), WAIT_CHECK_INTERVAL, 5000);
       deleteLifecyclePolicy(volumeName, bucketName);
 
+      // create new policy to test rule with prefix ".Tras" is ignored during 
lifecycle evaluation
+      now = ZonedDateTime.now(ZoneOffset.UTC);
+      date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, 
".Tras", null, date.toString(), true);
+
+      GenericTestUtils.waitFor(
+          () -> log.getOutput().contains("Skip evaluate trash directory " + 
TRASH_PREFIX), WAIT_CHECK_INTERVAL, 5000);
+      deleteLifecyclePolicy(volumeName, bucketName);
+
       // create new policy to test trash directory is skipped during lifecycle 
evaluation
       now = ZonedDateTime.now(ZoneOffset.UTC);
       date = now.plusSeconds(EXPIRE_SECONDS);
@@ -1410,6 +1438,104 @@ void testMoveToTrash(BucketLayout bucketLayout) throws 
IOException,
           () -> log.getOutput().contains("No expired keys/dirs found/remained 
for bucket"), WAIT_CHECK_INTERVAL, 5000);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
+
+    public Stream<Arguments> parameters8() {
+      return Stream.of(
+          arguments("dir1/dir2/dir3/key", null, "dir1/dir", "dir1/dir2/dir3", 
KEY_COUNT, 2, false),
+          arguments("dir1/dir2/dir3/key", null, "dir1/dir", "dir1/dir2/dir3", 
KEY_COUNT, 0, true),
+          arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir1/dir", 
"dir1/dir2/dir3", KEY_COUNT * 2, 4, false),
+          arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir1/dir", 
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true),
+          arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2/", 
"dir1/dir2/dir3", KEY_COUNT, 2, false),
+          arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2/", 
"dir1/dir2/dir3", KEY_COUNT, 0, true),
+          arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2", 
"dir1/dir2/dir3",
+              KEY_COUNT * 2, 4, false),
+          arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2", 
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true),
+          arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir", 
"dir1/dir2/dir3", KEY_COUNT * 2, 5, false),
+          arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir", 
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true),
+          arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1/", 
"dir1/dir2/dir3", KEY_COUNT, 3, false),
+          arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1/", 
"dir1/dir2/dir3", KEY_COUNT, 0, true),
+          arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1", 
"dir1/dir2/dir3", KEY_COUNT * 2, 6, false),
+          arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1", 
"dir1/dir2/dir3", KEY_COUNT * 2, 3, true),
+          arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "", 
"dir1/dir2/dir3", KEY_COUNT * 2, 5, false),
+          arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "", 
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true));
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters8")
+    void testMultipleDirectoriesMatched(String keyPrefix1, String keyPrefix2, 
String rulePrefix, String dirName,
+        int expectedDeletedKeyCount, int expectedDeletedDirCount, boolean 
updateDirModificationTime)
+        throws IOException, TimeoutException, InterruptedException {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialDeletedDirCount = getDeletedDirectoryCount();
+      long initialKeyCount = getKeyCount(FILE_SYSTEM_OPTIMIZED);
+      // create keys
+      List<OmKeyArgs> keyList1 =
+          createKeys(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, KEY_COUNT, 
1, keyPrefix1, null);
+      // check there are keys in keyTable
+      assertEquals(KEY_COUNT, keyList1.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(FILE_SYSTEM_OPTIMIZED) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
+
+      if (keyPrefix2 != null) {
+        List<OmKeyArgs> keyList2 = new ArrayList<>();
+        for (int x = 0; x < KEY_COUNT; x++) {
+          final String keyName = uniqueObjectName(keyPrefix2);
+          OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, 
keyName, 1, null);
+          keyList2.add(keyArg);
+        }
+        // check there are keys in keyTable
+        assertEquals(KEY_COUNT, keyList2.size());
+        GenericTestUtils.waitFor(() -> getKeyCount(FILE_SYSTEM_OPTIMIZED) - 
initialKeyCount == KEY_COUNT * 2,
+            WAIT_CHECK_INTERVAL, 1000);
+      }
+
+      // assert directory exists
+      KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, bucketName, 
dirName);
+      assertFalse(keyInfo.getKeyInfo().isFile());
+
+      KeyLifecycleService.setInjectors(
+          Arrays.asList(new FaultInjectorImpl(), new FaultInjectorImpl()));
+
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, 
rulePrefix, null, date.toString(), true);
+      LOG.info("expiry date {}", date.toInstant());
+
+      ZonedDateTime endDate = date.plus(SERVICE_INTERVAL, ChronoUnit.MILLIS);
+      GenericTestUtils.waitFor(() -> 
endDate.isBefore(ZonedDateTime.now(ZoneOffset.UTC)), WAIT_CHECK_INTERVAL, 
10000);
+
+      // rename a key under directory to change directory's Modification time
+      if (updateDirModificationTime) {
+        writeClient.renameKey(keyList1.get(0), keyList1.get(0).getKeyName() + 
"-new");
+        LOG.info("Dir {} refreshes its modification time", dirName);
+        KeyInfoWithVolumeContext keyInfo2 = getDirectory(volumeName, 
bucketName, dirName);
+        assertNotEquals(keyInfo.getKeyInfo().getModificationTime(), 
keyInfo2.getKeyInfo().getModificationTime());
+      }
+
+      // resume KeyLifecycleService bucket scan
+      KeyLifecycleService.getInjector(0).resume();
+      KeyLifecycleService.getInjector(1).resume();
+
+      GenericTestUtils.waitFor(() ->
+          (getDeletedKeyCount() - initialDeletedKeyCount) == 
expectedDeletedKeyCount, WAIT_CHECK_INTERVAL, 10000);
+      if (keyPrefix2 == null) {
+        assertEquals(0, getKeyCount(FILE_SYSTEM_OPTIMIZED) - initialKeyCount);
+      } else {
+        assertEquals(KEY_COUNT * 2 - expectedDeletedKeyCount, 
getKeyCount(FILE_SYSTEM_OPTIMIZED) - initialKeyCount);
+      }
+      GenericTestUtils.waitFor(() -> getDeletedDirectoryCount() - 
initialDeletedDirCount == expectedDeletedDirCount,
+          WAIT_CHECK_INTERVAL, 10000);
+      if (updateDirModificationTime) {
+        KeyInfoWithVolumeContext directory = getDirectory(volumeName, 
bucketName, dirName);
+        assertNotNull(directory);
+      } else {
+        assertThrows(OMException.class, () -> getDirectory(volumeName, 
bucketName, dirName));
+      }
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
   }
 
   /**
@@ -1609,7 +1735,7 @@ void testUnsupportedPrefixForFSO(String prefix, boolean 
createPrefix) {
 
   private List<OmKeyArgs> createKeys(String volume, String bucket, 
BucketLayout bucketLayout,
       int keyCount, int numBlocks, String keyPrefix, Map<String, String> tags) 
throws IOException {
-    return  createKeys(volume, bucket, bucketLayout, 
UserGroupInformation.getCurrentUser().getShortUserName(),
+    return createKeys(volume, bucket, bucketLayout, 
UserGroupInformation.getCurrentUser().getShortUserName(),
         keyCount, numBlocks, keyPrefix, tags);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to