This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-8342
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-8342 by this push:
     new fa33739da3e HDDS-12786. Refine keys lifecycle deletion batch size 
(#8909)
fa33739da3e is described below

commit fa33739da3e27e626e001430e3cfab9eadb9da83
Author: Sammi Chen <[email protected]>
AuthorDate: Wed Aug 20 16:05:49 2025 +0800

    HDDS-12786. Refine keys lifecycle deletion batch size (#8909)
---
 .../common/src/main/resources/ozone-default.xml    |   2 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   2 +-
 .../hadoop/ozone/om/helpers/OmLCExpiration.java    |   6 +-
 .../ozone/om/service/KeyLifecycleService.java      | 312 ++++++++++++----
 .../ozone/om/service/TestKeyLifecycleService.java  | 407 +++++++++++++--------
 5 files changed, 490 insertions(+), 239 deletions(-)

diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 95ae5f34aae..3990f3b4898 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4693,7 +4693,7 @@
   </property>
   <property>
     <name>ozone.lifecycle.service.delete.batch-size</name>
-    <value>100000</value>
+    <value>1000</value>
     <tag>OZONE</tag>
     <description>Max numbers of objects allowed for deletion in a batch for a 
lifecycle evaluating task.</description>
   </property>
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 555fc93f6ee..c44a0cdf380 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -194,7 +194,7 @@ private OMConfigKeys() {
   public static final boolean OZONE_KEY_LIFECYCLE_SERVICE_ENABLED_DEFAULT = 
false;
   public static final String OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE =
       "ozone.lifecycle.service.delete.batch-size";
-  public static final int 
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT = 100000;
+  public static final int 
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT = 1000;
 
   /**
    * OM Ratis related configurations.
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCExpiration.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCExpiration.java
index a0b70c4b462..bb15b447691 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCExpiration.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmLCExpiration.java
@@ -62,10 +62,12 @@ public String getDate() {
   }
 
   public boolean isExpired(long timestamp) {
-    ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
     if (zonedDateTime != null) {
-      return now.isAfter(zonedDateTime);
+      Instant instant = Instant.ofEpochMilli(timestamp);
+      ZonedDateTime objectTime = instant.atZone(ZoneOffset.UTC);
+      return objectTime.isBefore(zonedDateTime);
     } else {
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime dateTime =
           ZonedDateTime.ofInstant(Instant.ofEpochMilli(timestamp + 
daysInMilli), ZoneOffset.UTC);
       return now.isAfter(dateTime);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
index 231a88aff1a..f707979746b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.om.service;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
@@ -79,8 +80,8 @@ public class KeyLifecycleService extends BackgroundService {
       LoggerFactory.getLogger(KeyLifecycleService.class);
 
   private final OzoneManager ozoneManager;
-  //TODO: honor this parameter in next patch
-  private int keyLimitPerIterator;
+  private int keyDeleteBatchSize;
+  private int listMaxSize;
   private final AtomicBoolean suspended;
   private KeyLifecycleServiceMetrics metrics;
   private boolean isServiceEnabled;
@@ -100,10 +101,11 @@ public KeyLifecycleService(OzoneManager ozoneManager,
     super(KeyLifecycleService.class.getSimpleName(), serviceInterval, 
TimeUnit.MILLISECONDS,
         poolSize, serviceTimeout, ozoneManager.getThreadNamePrefix());
     this.ozoneManager = ozoneManager;
-    this.keyLimitPerIterator = 
conf.getInt(OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE,
+    this.keyDeleteBatchSize = 
conf.getInt(OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE,
         OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT);
-    Preconditions.checkArgument(keyLimitPerIterator >= 0,
-        OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE + " cannot be 
negative.");
+    Preconditions.checkArgument(keyDeleteBatchSize > 0,
+        OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE + " should be a positive 
value.");
+    this.listMaxSize = keyDeleteBatchSize >= 10000 ? keyDeleteBatchSize : 
10000;
     this.suspended = new AtomicBoolean(false);
     this.metrics = KeyLifecycleServiceMetrics.create();
     this.isServiceEnabled = 
conf.getBoolean(OZONE_KEY_LIFECYCLE_SERVICE_ENABLED,
@@ -239,11 +241,8 @@ public BackgroundTaskResult call() {
         List<OmLCRule> ruleList = originRuleList.stream().filter(r -> 
r.isEnabled()).collect(Collectors.toList());
 
         // scan file or key table for evaluate rules against files or keys
-        List<String> expiredKeyNameList = new ArrayList<>();
-        List<String> expiredDirNameList = new ArrayList<>();
-        List<Long> expiredKeyUpdateIDList = new ArrayList<>();
-        List<Long> expiredDirUpdateIDList = new ArrayList<>();
-        // TODO: limit expired key size in each iterator
+        LimitedExpiredObjectList expiredKeyList = new 
LimitedExpiredObjectList(listMaxSize);
+        LimitedExpiredObjectList expiredDirList = new 
LimitedExpiredObjectList(listMaxSize);
         Table<String, OmKeyInfo> keyTable = 
omMetadataManager.getKeyTable(bucket.getBucketLayout());
         /**
          * Filter treatment.
@@ -274,37 +273,34 @@ public BackgroundTaskResult call() {
             onFailure(bucketKey);
             return result;
           }
-          evaluateFSOBucket(volume, bucket, bucketKey, keyTable, ruleList,
-              expiredKeyNameList, expiredKeyUpdateIDList, expiredDirNameList, 
expiredDirUpdateIDList);
+          evaluateFSOBucket(volume, bucket, bucketKey, keyTable, ruleList, 
expiredKeyList, expiredDirList);
         } else {
           // use bucket name as key iterator prefix
-          evaluateBucket(bucketKey, keyTable, ruleList, expiredKeyNameList, 
expiredKeyUpdateIDList);
+          evaluateBucket(bucket, keyTable, ruleList, expiredKeyList);
         }
 
-        if (expiredKeyNameList.isEmpty() && expiredDirNameList.isEmpty()) {
+        if (expiredKeyList.isEmpty() && expiredDirList.isEmpty()) {
           LOG.info("No expired keys/dirs found for bucket {}", bucketKey);
           onSuccess(bucketKey);
           return result;
         }
 
         LOG.info("{} expired keys and {} expired dirs found for bucket {}",
-            expiredKeyNameList.size(), expiredDirNameList.size(), bucketKey);
+            expiredKeyList.size(), expiredDirList.size(), bucketKey);
 
         // If trash is enabled, move files to trash, instead of send delete 
requests.
         // OBS bucket doesn't support trash.
         if (bucket.getBucketLayout() == BucketLayout.OBJECT_STORE) {
-          sendDeleteKeysRequest(bucket.getVolumeName(), bucket.getBucketName(),
-              expiredKeyNameList, expiredKeyUpdateIDList, false);
+          sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(),
+              expiredKeyList, false);
         } else if (ozoneTrash != null) {
           // move keys to trash
           // TODO: add unit test in next patch
-          moveKeysToTrash(expiredKeyNameList);
+          moveKeysToTrash(bucket.getVolumeName(), bucket.getBucketName(), 
expiredKeyList);
         } else {
-          sendDeleteKeysRequest(bucket.getVolumeName(), 
bucket.getBucketName(), expiredKeyNameList,
-              expiredKeyUpdateIDList, false);
-          if (!expiredDirNameList.isEmpty()) {
-            sendDeleteKeysRequest(bucket.getVolumeName(), 
bucket.getBucketName(), expiredDirNameList,
-                expiredDirUpdateIDList, true);
+          sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), expiredKeyList, false);
+          if (!expiredDirList.isEmpty()) {
+            sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), expiredDirList, true);
           }
         }
         onSuccess(bucketKey);
@@ -316,9 +312,8 @@ public BackgroundTaskResult call() {
 
     @SuppressWarnings("checkstyle:parameternumber")
     private void evaluateFSOBucket(OmVolumeArgs volume, OmBucketInfo bucket, 
String bucketKey,
-                                   Table<String, OmKeyInfo> keyTable, 
List<OmLCRule> ruleList,
-                                   List<String> expiredKeyList, List<Long> 
expiredKeyUpdateIDList,
-                                   List<String> expiredDirList, List<Long> 
expiredDirUpdateIDList) {
+        Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
+        LimitedExpiredObjectList expiredKeyList, LimitedExpiredObjectList 
expiredDirList) {
       List<OmLCRule> directoryStylePrefixRuleList =
           ruleList.stream().filter(r -> 
r.isDirectoryStylePrefix()).collect(Collectors.toList());
       List<OmLCRule> nonDirectoryStylePrefixRuleList =
@@ -339,47 +334,59 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
           continue;
         }
         // use last directory's object ID to iterate the keys
-        String prefix = OzoneConsts.OM_KEY_PREFIX + volume.getObjectID() +
-            OzoneConsts.OM_KEY_PREFIX + bucket.getObjectID() + 
OzoneConsts.OM_KEY_PREFIX;
+        String prefix = OM_KEY_PREFIX + volume.getObjectID() +
+            OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
         StringBuffer directoryPath = new StringBuffer();
         if (!dirList.isEmpty()) {
-          prefix += dirList.get(dirList.size() - 1).getObjectID();
+          OmDirectoryInfo lastDir = dirList.get(dirList.size() - 1);
+          prefix += lastDir.getObjectID();
           for (OmDirectoryInfo dir : dirList) {
-            
directoryPath.append(dir.getName()).append(OzoneConsts.OM_KEY_PREFIX);
+            directoryPath.append(dir.getName()).append(OM_KEY_PREFIX);
           }
-          if (directoryPath.toString().equals(rule.getEffectivePrefix() + 
OzoneConsts.OM_KEY_PREFIX)) {
-            expiredDirList.add(directoryPath.toString());
-            expiredDirUpdateIDList.add(dirList.get(dirList.size() - 
1).getUpdateID());
+          if ((directoryPath.toString().equals(rule.getEffectivePrefix()) ||
+              directoryPath.toString().equals(rule.getEffectivePrefix() + 
OM_KEY_PREFIX))
+              && rule.match(lastDir, directoryPath.toString())) {
+            if (expiredDirList.isFull()) {
+              // if expiredDirList is full, send delete request for pending 
deletion directories
+              sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
+                  expiredDirList, true);
+            }
+            expiredDirList.add(directoryPath.toString(), 
lastDir.getUpdateID());
           }
         }
 
         LOG.info("Prefix {} for {}", prefix, bucketKey);
-        evaluateKeyTable(keyTable, prefix, directoryPath.toString(), rule, 
expiredKeyList,
-            expiredKeyUpdateIDList, bucketKey);
+        evaluateKeyTable(keyTable, prefix, directoryPath.toString(), rule, 
expiredKeyList, bucket);
         evaluateDirTable(directoryInfoTable, prefix, directoryPath.toString(), 
rule,
-            expiredDirList, expiredDirUpdateIDList, bucketKey);
+            expiredDirList, bucket);
       }
 
       for (OmLCRule rule : nonDirectoryStylePrefixRuleList) {
         // find the directory for the prefix, it may not exist
         OmDirectoryInfo dirInfo = getDirectory(volume, bucket, 
rule.getEffectivePrefix(), bucketKey);
-        String prefix = OzoneConsts.OM_KEY_PREFIX + volume.getObjectID() +
-            OzoneConsts.OM_KEY_PREFIX + bucket.getObjectID() + 
OzoneConsts.OM_KEY_PREFIX;
+        String prefix = OM_KEY_PREFIX + volume.getObjectID() +
+            OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
         if (dirInfo != null) {
           prefix += dirInfo.getObjectID();
-          if (dirInfo.getName().equals(rule.getEffectivePrefix())) {
-            expiredDirList.add(dirInfo.getName());
-            expiredDirUpdateIDList.add(dirInfo.getUpdateID());
+          if (dirInfo.getName().equals(rule.getEffectivePrefix()) && 
rule.match(dirInfo, dirInfo.getName())) {
+            if (expiredDirList.isFull()) {
+              // if expiredDirList is full, send delete request for pending 
deletion directories
+              sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
+                  expiredDirList, true);
+            }
+            expiredDirList.add(dirInfo.getName(), dirInfo.getUpdateID());
           }
         }
         LOG.info("Prefix {} for {}", prefix, bucketKey);
-        evaluateKeyTable(keyTable, prefix, "", rule, expiredKeyList, 
expiredKeyUpdateIDList, bucketKey);
-        evaluateDirTable(directoryInfoTable, prefix, "", rule, expiredDirList, 
expiredDirUpdateIDList, bucketKey);
+        evaluateKeyTable(keyTable, prefix, dirInfo == null ? "" : 
dirInfo.getName() + OzoneConsts.OM_KEY_PREFIX,
+            rule, expiredKeyList, bucket);
+        evaluateDirTable(directoryInfoTable, prefix,
+            dirInfo == null ? "" : dirInfo.getName() + 
OzoneConsts.OM_KEY_PREFIX, rule, expiredDirList, bucket);
       }
 
       if (!noPrefixRuleList.isEmpty()) {
-        String prefix = OzoneConsts.OM_KEY_PREFIX + volume.getObjectID() +
-            OzoneConsts.OM_KEY_PREFIX + bucket.getObjectID() + 
OzoneConsts.OM_KEY_PREFIX;
+        String prefix = OM_KEY_PREFIX + volume.getObjectID() +
+            OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
         LOG.info("prefix {} for {}", prefix, bucketKey);
         // use bucket name as key iterator prefix
         try (TableIterator<String, ? extends Table.KeyValue<String, 
OmKeyInfo>> keyTblItr =
@@ -391,8 +398,12 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
             for (OmLCRule rule : noPrefixRuleList) {
               if (rule.match(key)) {
                 // mark key as expired, check next key
-                expiredKeyList.add(key.getKeyName());
-                expiredKeyUpdateIDList.add(key.getUpdateID());
+                if (expiredKeyList.isFull()) {
+                  // if expiredKeyList is full, send delete request for 
pending deletion keys
+                  sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
+                      expiredKeyList, false);
+                }
+                expiredKeyList.add(key.getKeyName(), key.getUpdateID());
                 sizeKeyDeleted += key.getReplicatedSize();
                 break;
               }
@@ -411,22 +422,28 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
             numDirIterated++;
             for (OmLCRule rule : noPrefixRuleList) {
               if (rule.match(dir, dir.getPath())) {
-                // mark key as expired, check next key
-                expiredDirList.add(dir.getPath());
-                expiredDirUpdateIDList.add(dir.getUpdateID());
+                // mark directory as expired, check next directory
+                if (expiredDirList.isFull()) {
+                  // if expiredDirList is full, send delete request for 
pending deletion directories
+                  sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
+                      expiredDirList, true);
+                }
+                expiredDirList.add(dir.getPath(), dir.getUpdateID());
                 break;
               }
             }
           }
         } catch (IOException e) {
-          // log failure and continue the process to delete/move files already 
identified in this run
-          LOG.warn("Failed to iterate keyTable for bucket {}", bucketKey, e);
+          // log failure and continue the process to delete/move directories 
already identified in this run
+          LOG.warn("Failed to iterate directoryTable for bucket {}", 
bucketKey, e);
         }
       }
     }
 
     private void evaluateKeyTable(Table<String, OmKeyInfo> keyTable, String 
prefix, String directoryPath,
-        OmLCRule rule, List<String> keyList, List<Long> keyUpdateIDList, 
String bucketKey) {
+        OmLCRule rule, LimitedExpiredObjectList keyList, OmBucketInfo bucket) {
+      String volumeName = bucket.getVolumeName();
+      String bucketName = bucket.getBucketName();
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyTblItr =
                keyTable.iterator(prefix)) {
         while (keyTblItr.hasNext()) {
@@ -436,19 +453,24 @@ private void evaluateKeyTable(Table<String, OmKeyInfo> 
keyTable, String prefix,
           numKeyIterated++;
           if (rule.match(key, keyPath)) {
             // mark key as expired, check next key
-            keyList.add(keyPath);
-            keyUpdateIDList.add(key.getUpdateID());
+            if (keyList.isFull()) {
+              // if keyList is full, send delete request for pending deletion 
keys
+              sendDeleteKeysRequestAndClearList(volumeName, bucketName, 
keyList, false);
+            }
+            keyList.add(keyPath, key.getUpdateID());
             sizeKeyDeleted += key.getReplicatedSize();
           }
         }
       } catch (IOException e) {
         // log failure and continue the process to delete/move files already 
identified in this run
-        LOG.warn("Failed to iterate keyTable for bucket {}", bucketKey, e);
+        LOG.warn("Failed to iterate keyTable for bucket {}/{}", volumeName, 
bucketName, e);
       }
     }
 
     private void evaluateDirTable(Table<String, OmDirectoryInfo> 
directoryInfoTable, String prefix,
-        String directoryPath, OmLCRule rule, List<String> dirList, List<Long> 
dirUpdateIDList, String bucketKey) {
+        String directoryPath, OmLCRule rule, LimitedExpiredObjectList dirList, 
OmBucketInfo bucket) {
+      String volumeName = bucket.getVolumeName();
+      String bucketName = bucket.getBucketName();
       try (TableIterator<String, ? extends Table.KeyValue<String, 
OmDirectoryInfo>> dirTblItr =
                directoryInfoTable.iterator(prefix)) {
         while (dirTblItr.hasNext()) {
@@ -458,22 +480,26 @@ private void evaluateDirTable(Table<String, 
OmDirectoryInfo> directoryInfoTable,
           numDirIterated++;
           if (rule.match(dir, dirPath)) {
             // mark dir as expired, check next key
-            dirList.add(dirPath);
-            dirUpdateIDList.add(dir.getUpdateID());
+            if (dirList.isFull()) {
+              // if dirList is full, send delete request for pending deletion 
directories
+              sendDeleteKeysRequestAndClearList(volumeName, bucketName, 
dirList, true);
+            }
+            dirList.add(dirPath, dir.getUpdateID());
           }
         }
       } catch (IOException e) {
         // log failure and continue the process to delete/move files already 
identified in this run
-        LOG.warn("Failed to iterate directoryInfoTable for bucket {}", 
bucketKey, e);
+        LOG.warn("Failed to iterate directoryTable for bucket {}/{}", 
volumeName, bucketName, e);
       }
     }
 
-    private void evaluateBucket(String bucketKey,
-        Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
-        List<String> expiredKeyList, List<Long> expiredKeyUpdateIDList) {
+    private void evaluateBucket(OmBucketInfo bucketInfo,
+        Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList, 
LimitedExpiredObjectList expiredKeyList) {
+      String volumeName = bucketInfo.getVolumeName();
+      String bucketName = bucketInfo.getBucketName();
       // use bucket name as key iterator prefix
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyTblItr =
-               keyTable.iterator(bucketKey)) {
+               keyTable.iterator(omMetadataManager.getBucketKey(volumeName, 
bucketName))) {
         while (keyTblItr.hasNext()) {
           Table.KeyValue<String, OmKeyInfo> keyValue = keyTblItr.next();
           OmKeyInfo key = keyValue.getValue();
@@ -481,8 +507,11 @@ private void evaluateBucket(String bucketKey,
           for (OmLCRule rule : ruleList) {
             if (rule.match(key)) {
               // mark key as expired, check next key
-              expiredKeyList.add(key.getKeyName());
-              expiredKeyUpdateIDList.add(key.getUpdateID());
+              if (expiredKeyList.isFull()) {
+                // if expiredKeyList is full, send delete request for pending 
deletion keys
+                sendDeleteKeysRequestAndClearList(volumeName, bucketName, 
expiredKeyList, false);
+              }
+              expiredKeyList.add(key.getKeyName(), key.getUpdateID());
               sizeKeyDeleted += key.getReplicatedSize();
               break;
             }
@@ -490,7 +519,7 @@ private void evaluateBucket(String bucketKey,
         }
       } catch (IOException e) {
         // log failure and continue the process to delete/move files already 
identified in this run
-        LOG.warn("Failed to iterate through bucket {}", bucketKey, e);
+        LOG.warn("Failed to iterate through bucket {}/{}", volumeName, 
bucketName, e);
       }
     }
 
@@ -560,8 +589,8 @@ private void onSuccess(String bucketName) {
           timeSpent, bucketName, numKeyIterated, numDirIterated, 
numKeyDeleted, sizeKeyDeleted, numDirDeleted);
     }
 
-    private void sendDeleteKeysRequest(String volume, String bucket, 
List<String> keysList,
-        List<Long> expiredKeyUpdateIDList, boolean dir) {
+    private void sendDeleteKeysRequestAndClearList(String volume, String 
bucket,
+        LimitedExpiredObjectList keysList, boolean dir) {
       try {
         if (getInjector(1) != null) {
           try {
@@ -571,7 +600,7 @@ private void sendDeleteKeysRequest(String volume, String 
bucket, List<String> ke
           }
         }
 
-        int batchSize = keyLimitPerIterator;
+        int batchSize = keyDeleteBatchSize;
         int startIndex = 0;
         for (int i = 0; i < keysList.size();) {
           DeleteKeyArgs.Builder builder =
@@ -579,12 +608,12 @@ private void sendDeleteKeysRequest(String volume, String 
bucket, List<String> ke
           int endIndex = startIndex + (batchSize < (keysList.size() - 
startIndex) ?
               batchSize : keysList.size() - startIndex);
           int keyCount = endIndex - startIndex;
-          builder.addAllKeys(keysList.subList(startIndex, endIndex));
-          builder.addAllUpdateIDs(expiredKeyUpdateIDList.subList(startIndex, 
endIndex));
+          builder.addAllKeys(keysList.nameSubList(startIndex, endIndex));
+          builder.addAllUpdateIDs(keysList.updateIDSubList(startIndex, 
endIndex));
 
           DeleteKeyArgs deleteKeyArgs = builder.build();
           DeleteKeysRequest deleteKeysRequest = 
DeleteKeysRequest.newBuilder().setDeleteKeys(deleteKeyArgs).build();
-          LOG.info("request size {} for {} keys", 
deleteKeysRequest.getSerializedSize(), keyCount);
+          LOG.debug("request size {} for {} keys", 
deleteKeysRequest.getSerializedSize(), keyCount);
 
           if (deleteKeysRequest.getSerializedSize() < ratisByteLimit) {
             // send request out
@@ -594,9 +623,24 @@ private void sendDeleteKeysRequest(String volume, String 
bucket, List<String> ke
                 .setClientId(clientId.toString())
                 .setDeleteKeysRequest(deleteKeysRequest)
                 .build();
-            OzoneManagerRatisUtils.submitRequest(getOzoneManager(), omRequest, 
clientId, callId.getAndIncrement());
+            long startTime = System.nanoTime();
+            final OzoneManagerProtocolProtos.OMResponse response = 
OzoneManagerRatisUtils.submitRequest(
+                getOzoneManager(), omRequest, clientId, 
callId.getAndIncrement());
+            long endTime = System.nanoTime();
+            LOG.debug("DeleteKeys request with {} keys cost {} ns", keyCount, 
endTime - startTime);
             i += batchSize;
             startIndex += batchSize;
+            if (response != null) {
+              if (!response.getSuccess()) {
+                // log the failure and continue the iterating
+                LOG.error("DeleteKeys request failed with " + 
response.getMessage() +
+                    " for volume: {}, bucket: {}", volume, bucket);
+                continue;
+              } else {
+                LOG.debug("DeleteKeys request of total {} keys, {} not 
deleted", keyCount,
+                    response.getDeleteKeysResponse().getErrorsCount());
+              }
+            }
             if (dir) {
               numDirDeleted += keyCount;
               metrics.incrNumDirDeleted(keyCount);
@@ -608,18 +652,20 @@ private void sendDeleteKeysRequest(String volume, String 
bucket, List<String> ke
             batchSize /= 2;
           }
         }
+        keysList.clear();
       } catch (ServiceException e) {
         LOG.error("Failed to send DeleteKeysRequest", e);
       }
     }
 
-    private void moveKeysToTrash(List<String> keysList) {
-      for (String key : keysList) {
+    private void moveKeysToTrash(String volume, String bucket, 
LimitedExpiredObjectList keysList) {
+      for (int index = 0; index < keysList.size(); index++) {
         try {
-          ozoneTrash.moveToTrash(new Path(key));
+          ozoneTrash.moveToTrash(new Path(OM_KEY_PREFIX + volume + 
OM_KEY_PREFIX + bucket + OM_KEY_PREFIX +
+              keysList.getName(index)));
         } catch (IOException e) {
           // log failure and continue
-          LOG.warn("Failed to move key {} to trash", key, e);
+          LOG.warn("Failed to move key {} to trash", keysList.getName(index), 
e);
         }
       }
     }
@@ -634,4 +680,114 @@ public static FaultInjector getInjector(int index) {
   public static void setInjectors(List<FaultInjector> instance) {
     injectors = instance;
   }
+
+  @VisibleForTesting
+  public static Logger getLog() {
+    return LOG;
+  }
+
+  @VisibleForTesting
+  public void setListMaxSize(int size) {
+    this.listMaxSize = size;
+  }
+
+  /**
+   * An in-memory list with limited size to hold expired object infos, 
including object name and current update ID.
+   */
+  public static class LimitedExpiredObjectList {
+    private final LimitedSizeList<String> objectNames;
+    private final List<Long> objectUpdateIDs;
+
+    public LimitedExpiredObjectList(int maxListSize) {
+      this.objectNames = new LimitedSizeList<>(maxListSize);
+      this.objectUpdateIDs = new ArrayList<>();
+    }
+
+    public void add(String name, long updateID) {
+      objectNames.add(name);
+      objectUpdateIDs.add(updateID);
+    }
+
+    public int size() {
+      return objectNames.size();
+    }
+
+    public List<String> nameSubList(int fromIndex, int toIndex) {
+      return objectNames.subList(fromIndex, toIndex);
+    }
+
+    public List<Long> updateIDSubList(int fromIndex, int toIndex) {
+      return objectUpdateIDs.subList(fromIndex, toIndex);
+    }
+
+    public void clear() {
+      objectNames.clear();
+      objectUpdateIDs.clear();
+    }
+
+    public boolean isEmpty() {
+      return objectNames.isEmpty();
+    }
+
+    public boolean isFull() {
+      return objectNames.isFull();
+    }
+
+    public String getName(int index) {
+      return objectNames.get(index);
+    }
+
+    public long getUpdateID(int index) {
+      return objectUpdateIDs.get(index);
+    }
+  }
+
+  /**
+   * An in-memory list with a maximum size. This class is not thread safe.
+   */
+  public static class LimitedSizeList<T> {
+    private final List<T> internalList;
+    private final int maxSize;
+
+    public LimitedSizeList(int maxSize) {
+      this.maxSize = maxSize;
+      this.internalList = new ArrayList<>();
+    }
+
+    /**
+     * Add an element to the list. It blindly adds the element without check 
whether the list is full or not.
+     * Caller must check the size of the list through isFull() before calling 
this method.
+     */
+    public void add(T element) {
+      internalList.add(element);
+    }
+
+    public T get(int index) {
+      return internalList.get(index);
+    }
+
+    public int size() {
+      return internalList.size();
+    }
+
+    public List<T> subList(int fromIndex, int toIndex) {
+      return internalList.subList(fromIndex, toIndex);
+    }
+
+    public boolean isEmpty() {
+      return internalList.isEmpty();
+    }
+
+    public boolean isFull() {
+      boolean full = internalList.size() >= maxSize;
+      if (full) {
+        LOG.debug("LimitedSizeList has reached maximum size {}", maxSize);
+      }
+      return full;
+    }
+
+    public void clear() {
+      internalList.clear();
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
index 2ecbc540a76..26b82c0c5fa 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
@@ -20,9 +20,11 @@
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
+import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -104,8 +106,10 @@
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
 
 /**
  * Test Key Lifecycle Service.
@@ -126,7 +130,8 @@ class TestKeyLifecycleService extends OzoneTestBase {
   private static final AtomicInteger OBJECT_ID_COUNTER = new AtomicInteger();
   private static final int KEY_COUNT = 10;
   private static final int EXPIRE_SECONDS = 2;
-  private static final int SERVICE_INTERVAL = 500;
+  private static final int SERVICE_INTERVAL = 300;
+  private static final int WAIT_CHECK_INTERVAL = 50;
 
   private OzoneConfiguration conf;
   private OzoneManagerProtocol writeClient;
@@ -152,6 +157,7 @@ private void createConfig(File testDir) {
         200, TimeUnit.MILLISECONDS);
     conf.setBoolean(OZONE_KEY_LIFECYCLE_SERVICE_ENABLED, true);
     conf.setTimeDuration(OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL, 
SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
+    conf.setInt(OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE, 50);
     conf.setQuietMode(false);
     OmLCExpiration.setTest(true);
   }
@@ -192,19 +198,18 @@ void resume() {
 
     @AfterAll
     void cleanup() {
-      if (om.stop()) {
+      if (om != null) {
+        om.stop();
         om.join();
       }
     }
 
     public Stream<Arguments> parameters1() {
       return Stream.of(
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED, true),
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED, false),
+          arguments(FILE_SYSTEM_OPTIMIZED, true),
+          arguments(FILE_SYSTEM_OPTIMIZED, false),
           arguments(BucketLayout.OBJECT_STORE, true),
-          arguments(BucketLayout.OBJECT_STORE, false),
-          arguments(BucketLayout.LEGACY, true),
-          arguments(BucketLayout.LEGACY, false)
+          arguments(BucketLayout.OBJECT_STORE, false)
       );
     }
 
@@ -226,9 +231,9 @@ void testAllKeyExpired(BucketLayout bucketLayout, boolean 
createPrefix) throws I
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -240,7 +245,7 @@ void testAllKeyExpired(BucketLayout bucketLayout, boolean 
createPrefix) throws I
       }
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
@@ -258,9 +263,9 @@ void testOneKeyExpired(BucketLayout bucketLayout, boolean 
createPrefix) throws I
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -273,7 +278,7 @@ void testOneKeyExpired(BucketLayout bucketLayout, boolean 
createPrefix) throws I
         createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
       }
 
-      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, SERVICE_INTERVAL, 10000);
+      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, WAIT_CHECK_INTERVAL, 10000);
       assertEquals(KEY_COUNT - 1, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
@@ -303,13 +308,13 @@ void testOnlyKeyExpired(BucketLayout bucketLayout, 
boolean createPrefix) throws
         createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
       }
 
-      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, SERVICE_INTERVAL, 10000);
+      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testAllKeyExpiredWithTag(BucketLayout bucketLayout) throws 
IOException,
         TimeoutException, InterruptedException {
       final String volumeName = getTestName();
@@ -323,9 +328,9 @@ void testAllKeyExpiredWithTag(BucketLayout bucketLayout) 
throws IOException,
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, tags);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -333,13 +338,13 @@ void testAllKeyExpiredWithTag(BucketLayout bucketLayout) 
throws IOException,
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testOneKeyExpiredWithTag(BucketLayout bucketLayout) throws 
IOException,
         TimeoutException, InterruptedException {
       int keyCount = KEY_COUNT;
@@ -361,22 +366,22 @@ void testOneKeyExpiredWithTag(BucketLayout bucketLayout) 
throws IOException,
       keyCount++;
 
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(keyCount, keyList.size());
-      assertEquals(keyCount, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT + 1,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
       OmLCFilter.Builder filter = getOmLCFilterBuilder(null, tag, null);
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
 
-      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, SERVICE_INTERVAL, 10000);
+      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, WAIT_CHECK_INTERVAL, 10000);
       assertEquals(keyCount - 1, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testAllKeyExpiredWithAndOperator(BucketLayout bucketLayout) throws 
IOException,
         TimeoutException, InterruptedException {
       final String volumeName = getTestName();
@@ -389,9 +394,9 @@ void testAllKeyExpiredWithAndOperator(BucketLayout 
bucketLayout) throws IOExcept
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, tags);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -400,13 +405,13 @@ void testAllKeyExpiredWithAndOperator(BucketLayout 
bucketLayout) throws IOExcept
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testOneKeyExpiredWithAndOperator(BucketLayout bucketLayout) throws 
IOException,
         TimeoutException, InterruptedException {
       int keyCount = KEY_COUNT;
@@ -427,9 +432,9 @@ void testOneKeyExpiredWithAndOperator(BucketLayout 
bucketLayout) throws IOExcept
       keyCount++;
 
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(keyCount, keyList.size());
-      assertEquals(keyCount, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT + 1,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -437,13 +442,13 @@ void testOneKeyExpiredWithAndOperator(BucketLayout 
bucketLayout) throws IOExcept
       OmLCFilter.Builder filter = getOmLCFilterBuilder(null, null, 
andOperator);
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
 
-      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, SERVICE_INTERVAL, 10000);
+      GenericTestUtils.waitFor(() -> (getDeletedKeyCount() - 
initialDeletedKeyCount) == 1, WAIT_CHECK_INTERVAL, 10000);
       assertEquals(keyCount - 1, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testEmptyPrefix(BucketLayout bucketLayout) throws IOException, 
TimeoutException, InterruptedException {
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
@@ -454,22 +459,22 @@ void testEmptyPrefix(BucketLayout bucketLayout) throws 
IOException, TimeoutExcep
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, "", null, 
date.toString(), true);
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testEmptyFilter(BucketLayout bucketLayout) throws IOException,
         TimeoutException, InterruptedException {
       final String volumeName = getTestName();
@@ -481,9 +486,9 @@ void testEmptyFilter(BucketLayout bucketLayout) throws 
IOException,
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -491,7 +496,7 @@ void testEmptyFilter(BucketLayout bucketLayout) throws 
IOException,
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
@@ -516,17 +521,17 @@ void testRootSlashPrefix(BucketLayout bucketLayout, 
String prefix)
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
keyPrefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
 
-      if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
         GenericTestUtils.waitFor(() ->
-            (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+            (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
         assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       } else {
         Thread.sleep(EXPIRE_SECONDS);
@@ -540,7 +545,7 @@ void testRootSlashPrefix(BucketLayout bucketLayout, String 
prefix)
     @MethodSource("parameters1")
     void testSlashPrefix(BucketLayout bucketLayout, boolean createPrefix)
         throws IOException, TimeoutException, InterruptedException {
-      assumeTrue(bucketLayout != BucketLayout.FILE_SYSTEM_OPTIMIZED);
+      assumeTrue(bucketLayout != FILE_SYSTEM_OPTIMIZED);
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String keyPrefix = "key";
@@ -550,9 +555,9 @@ void testSlashPrefix(BucketLayout bucketLayout, boolean 
createPrefix)
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
keyPrefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -563,9 +568,9 @@ void testSlashPrefix(BucketLayout bucketLayout, boolean 
createPrefix)
         createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
       }
 
-      if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
         GenericTestUtils.waitFor(() ->
-            (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+            (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
         assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       } else {
         Thread.sleep(EXPIRE_SECONDS);
@@ -579,7 +584,7 @@ void testSlashPrefix(BucketLayout bucketLayout, boolean 
createPrefix)
     @MethodSource("parameters1")
     void testSlashKey(BucketLayout bucketLayout, boolean createPrefix)
         throws IOException, TimeoutException, InterruptedException {
-      assumeTrue(bucketLayout != BucketLayout.FILE_SYSTEM_OPTIMIZED);
+      assumeTrue(bucketLayout != FILE_SYSTEM_OPTIMIZED);
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String keyPrefix = "/key//";
@@ -589,9 +594,9 @@ void testSlashKey(BucketLayout bucketLayout, boolean 
createPrefix)
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
keyPrefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -603,16 +608,16 @@ void testSlashKey(BucketLayout bucketLayout, boolean 
createPrefix)
       }
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testSlashKeyWithAndOperator(BucketLayout bucketLayout)
         throws IOException, TimeoutException, InterruptedException {
-      assumeTrue(bucketLayout != BucketLayout.FILE_SYSTEM_OPTIMIZED);
+      assumeTrue(bucketLayout != FILE_SYSTEM_OPTIMIZED);
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String keyPrefix = "/key//";
@@ -623,9 +628,9 @@ void testSlashKeyWithAndOperator(BucketLayout bucketLayout)
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
keyPrefix, tags);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -634,16 +639,16 @@ void testSlashKeyWithAndOperator(BucketLayout 
bucketLayout)
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, null, 
filter.build(), date.toString(), true);
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
-    void testSlashPrefixWithAndOperator(BucketLayout bucketLayout) throws 
IOException, InterruptedException {
-      assumeTrue(bucketLayout != BucketLayout.FILE_SYSTEM_OPTIMIZED);
-      final int keyCount = KEY_COUNT;
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
+    void testSlashPrefixWithAndOperator(BucketLayout bucketLayout)
+        throws IOException, InterruptedException, TimeoutException {
+      assumeTrue(bucketLayout != FILE_SYSTEM_OPTIMIZED);
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String keyPrefix = "key";
@@ -652,11 +657,11 @@ void testSlashPrefixWithAndOperator(BucketLayout 
bucketLayout) throws IOExceptio
       Map<String, String> tags = ImmutableMap.of("app", "spark", "user", 
"ozone");
       // create keys
       List<OmKeyArgs> keyList =
-          createKeys(volumeName, bucketName, bucketLayout, keyCount, 1, 
keyPrefix, tags);
+          createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
keyPrefix, tags);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
-      assertEquals(keyCount, keyList.size());
-      assertEquals(keyCount, getKeyCount(bucketLayout) - initialKeyCount);
+      assertEquals(KEY_COUNT, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -666,7 +671,7 @@ void testSlashPrefixWithAndOperator(BucketLayout 
bucketLayout) throws IOExceptio
 
       Thread.sleep(EXPIRE_SECONDS);
       assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
-      assertEquals(keyCount, getKeyCount(bucketLayout) - initialKeyCount);
+      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
@@ -685,16 +690,16 @@ void testComplexPrefix(BucketLayout bucketLayout) throws 
IOException,
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       assertEquals(KEY_COUNT, metrics.getNumKeyDeleted().value() - 
initialNumDeletedKey);
       // each key is 1000 bytes size
@@ -703,8 +708,8 @@ void testComplexPrefix(BucketLayout bucketLayout) throws 
IOException,
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
-    void testPrefixNotMatch(BucketLayout bucketLayout) throws IOException, 
InterruptedException {
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
+    void testPrefixNotMatch(BucketLayout bucketLayout) throws IOException, 
InterruptedException, TimeoutException {
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String keyPrefix = "dir1/dir2/dir3/key";
@@ -715,9 +720,9 @@ void testPrefixNotMatch(BucketLayout bucketLayout) throws 
IOException, Interrupt
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
keyPrefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -731,7 +736,7 @@ void testPrefixNotMatch(BucketLayout bucketLayout) throws 
IOException, Interrupt
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testExpireKeysUnderDirectory(BucketLayout bucketLayout) throws 
IOException,
         TimeoutException, InterruptedException {
       final String volumeName = getTestName();
@@ -743,11 +748,11 @@ void testExpireKeysUnderDirectory(BucketLayout 
bucketLayout) throws IOException,
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // assert directory "dir1/dir2/dir3" exists
-      if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
         KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, 
bucketName, "dir1/dir2/dir3");
         assertFalse(keyInfo.getKeyInfo().isFile());
       }
@@ -758,12 +763,70 @@ void testExpireKeysUnderDirectory(BucketLayout 
bucketLayout) throws IOException,
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
 
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     public Stream<Arguments> parameters3() {
+      return Stream.of(
+          arguments("dir1/dir2/dir3/key", "dir1/dir2/dir3/", "dir1/dir2/dir3"),
+          arguments("dir1/dir2/dir3/key", "dir1/dir2/dir3", "dir1/dir2/dir3"),
+          arguments("dir1/key", "dir1", "dir1"),
+          arguments("dir1/key", "dir1/", "dir1"));
+    }
+
+    @ParameterizedTest
+    @MethodSource("parameters3")
+    void testMatchedDirectoryNotDeleted(String keyPrefix, String rulePrefix, 
String dirName) throws IOException,
+        TimeoutException, InterruptedException {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialDeletedDirCount = getDeletedDirectoryCount();
+      long initialKeyCount = getKeyCount(FILE_SYSTEM_OPTIMIZED);
+      // create keys
+      List<OmKeyArgs> keyList =
+          createKeys(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, KEY_COUNT, 
1, keyPrefix, null);
+      // check there are keys in keyTable
+      assertEquals(KEY_COUNT, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(FILE_SYSTEM_OPTIMIZED) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
+
+      // assert directory exists
+      KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, bucketName, 
dirName);
+      assertFalse(keyInfo.getKeyInfo().isFile());
+
+      KeyLifecycleService.setInjectors(
+          Arrays.asList(new FaultInjectorImpl(), new FaultInjectorImpl()));
+
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, 
rulePrefix, null, date.toString(), true);
+      LOG.info("expiry date {}", date.toInstant().toEpochMilli());
+
+      GenericTestUtils.waitFor(() -> 
date.isBefore(ZonedDateTime.now(ZoneOffset.UTC)), WAIT_CHECK_INTERVAL, 10000);
+
+      // rename a key under directory to change directory's Modification time
+      writeClient.renameKey(keyList.get(0), keyList.get(0).getKeyName() + 
"-new");
+      LOG.info("Dir {} refreshes its modification time", dirName);
+
+      // resume KeyLifecycleService bucket scan
+      KeyLifecycleService.getInjector(0).resume();
+      KeyLifecycleService.getInjector(1).resume();
+
+      GenericTestUtils.waitFor(() ->
+          (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
+      assertEquals(0, getKeyCount(FILE_SYSTEM_OPTIMIZED) - initialKeyCount);
+      assertEquals(0, getDeletedDirectoryCount() - initialDeletedDirCount);
+      KeyInfoWithVolumeContext directory = getDirectory(volumeName, 
bucketName, dirName);
+      assertNotNull(directory);
+
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
+
+    public Stream<Arguments> parameters4() {
       return Stream.of(
           arguments("dir1/dir2/dir3", "dir1/dir2/dir3", 3, 0, true, false),
           arguments("dir1/dir2/dir3", "dir1/dir2/dir3", 3, 0, false, true),
@@ -805,7 +868,7 @@ public Stream<Arguments> parameters3() {
     }
 
     @ParameterizedTest
-    @MethodSource("parameters3")
+    @MethodSource("parameters4")
     void testExpireOnlyDirectory(String dirName, String prefix, int dirDepth, 
int deletedDirCount,
         boolean createPrefix, boolean createFilterPrefix) throws IOException,
         TimeoutException, InterruptedException {
@@ -816,7 +879,7 @@ void testExpireOnlyDirectory(String dirName, String prefix, 
int dirDepth, int de
       long initialNumDeletedDir = metrics.getNumDirDeleted().value();
 
       // Create the directory
-      createVolumeAndBucket(volumeName, bucketName, 
BucketLayout.FILE_SYSTEM_OPTIMIZED, false);
+      createVolumeAndBucket(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, 
false);
       createDirectory(volumeName, bucketName, dirName);
       KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, bucketName, 
dirName);
       assertFalse(keyInfo.getKeyInfo().isFile());
@@ -828,24 +891,25 @@ void testExpireOnlyDirectory(String dirName, String 
prefix, int dirDepth, int de
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
       if (createPrefix) {
-        createLifecyclePolicy(volumeName, bucketName, 
BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        createLifecyclePolicy(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED,
             prefix, null, date.toString(), true);
       } else if (createFilterPrefix) {
         OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
-        createLifecyclePolicy(volumeName, bucketName, 
BucketLayout.FILE_SYSTEM_OPTIMIZED,
+        createLifecyclePolicy(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED,
             null, filter.build(), date.toString(), true);
       }
 
       GenericTestUtils.waitFor(
-          () -> (getDeletedDirectoryCount() - initialDeletedDirCount) == 
deletedDirCount, SERVICE_INTERVAL, 10000);
+          () -> (getDeletedDirectoryCount() - initialDeletedDirCount) == 
deletedDirCount, WAIT_CHECK_INTERVAL, 10000);
       assertEquals(dirDepth - deletedDirCount, getDirCount() - 
initialDirCount);
       assertEquals(deletedDirCount, metrics.getNumDirDeleted().value() - 
initialNumDeletedDir);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
-    void testExpireNonExistDirectory(BucketLayout bucketLayout) throws 
IOException, InterruptedException {
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
+    void testExpireNonExistDirectory(BucketLayout bucketLayout)
+        throws IOException, InterruptedException, TimeoutException {
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String prefix = "dir1/dir2/dir3/key";
@@ -857,11 +921,11 @@ void testExpireNonExistDirectory(BucketLayout 
bucketLayout) throws IOException,
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // assert directory "dir1/dir2/dir3" exists
-      if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
         KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, 
bucketName, dirPath);
         assertFalse(keyInfo.getKeyInfo().isFile());
       }
@@ -873,7 +937,7 @@ void testExpireNonExistDirectory(BucketLayout bucketLayout) 
throws IOException,
 
       Thread.sleep(EXPIRE_SECONDS);
 
-      if (bucketLayout == BucketLayout.FILE_SYSTEM_OPTIMIZED) {
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
         assertEquals(0, getDeletedDirectoryCount() - initialDeletedDirCount);
         assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
       } else {
@@ -884,8 +948,8 @@ void testExpireNonExistDirectory(BucketLayout bucketLayout) 
throws IOException,
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
-    void testRuleDisabled(BucketLayout bucketLayout) throws IOException, 
InterruptedException {
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
+    void testRuleDisabled(BucketLayout bucketLayout) throws IOException, 
InterruptedException, TimeoutException {
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String prefix = "key";
@@ -895,9 +959,9 @@ void testRuleDisabled(BucketLayout bucketLayout) throws 
IOException, Interrupted
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -909,7 +973,7 @@ void testRuleDisabled(BucketLayout bucketLayout) throws 
IOException, Interrupted
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testOneRuleDisabledOneRuleEnabled(BucketLayout bucketLayout)
         throws IOException, InterruptedException, TimeoutException {
       final String volumeName = getTestName();
@@ -921,9 +985,9 @@ void testOneRuleDisabledOneRuleEnabled(BucketLayout 
bucketLayout)
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, KEY_COUNT, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(KEY_COUNT, keyList.size());
-      assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -939,13 +1003,13 @@ void testOneRuleDisabledOneRuleEnabled(BucketLayout 
bucketLayout)
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, ruleList);
 
       GenericTestUtils.waitFor(
-          () -> (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
SERVICE_INTERVAL, 10000);
+          () -> (getDeletedKeyCount() - initialDeletedKeyCount) == KEY_COUNT, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testKeyUpdatedShouldNotGetDeleted(BucketLayout bucketLayout)
         throws IOException, InterruptedException, TimeoutException {
       final String volumeName = getTestName();
@@ -975,7 +1039,7 @@ void testKeyUpdatedShouldNotGetDeleted(BucketLayout 
bucketLayout)
 
       GenericTestUtils.waitFor(
           () -> log.getOutput().contains(KEY_COUNT + " expired keys and 0 
expired dirs found"),
-          SERVICE_INTERVAL, 10000);
+          WAIT_CHECK_INTERVAL, 10000);
 
       OmKeyArgs key = 
keyList.get(ThreadLocalRandom.current().nextInt(keyList.size()));
       // update a key before before send deletion requests
@@ -994,14 +1058,14 @@ void testKeyUpdatedShouldNotGetDeleted(BucketLayout 
bucketLayout)
       Thread.sleep(SERVICE_INTERVAL);
       KeyLifecycleService.getInjector(1).resume();
       String expectedString = "Received a request to delete a Key /" + 
key.getVolumeName() + "/" +
-          key.getBucketName() + "/" + key.getKeyName() +  " whose updateID not 
match or null";
-      GenericTestUtils.waitFor(() -> 
requestLog.getOutput().contains(expectedString), SERVICE_INTERVAL, 10000);
+          key.getBucketName() + "/" + key.getKeyName() + " whose updateID not 
match or null";
+      GenericTestUtils.waitFor(() -> 
requestLog.getOutput().contains(expectedString), WAIT_CHECK_INTERVAL, 10000);
 
       // rename will change object's modificationTime. But since expiration 
action is an absolute timestamp, so
       // the renamed key will expire in next evaluation task
       GenericTestUtils.waitFor(() -> log.getOutput().contains("1 expired keys 
and 0 expired dirs found"),
-          SERVICE_INTERVAL, 10000);
-      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == 0, SERVICE_INTERVAL, 10000);
+          WAIT_CHECK_INTERVAL, 10000);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == 0, WAIT_CHECK_INTERVAL, 10000);
       assertEquals(KEY_COUNT, getDeletedKeyCount() - initialDeletedKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
@@ -1032,7 +1096,7 @@ void testKeyUpdatedShouldNotGetDeleted(BucketLayout 
bucketLayout)
      * 
5259,/testPerformanceWithExpiredKeys/bucket1000002,500000,500000,1500000000
      */
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testPerformanceWithExpiredKeys(BucketLayout bucketLayout)
         throws IOException, InterruptedException, TimeoutException {
       final String volumeName = getTestName();
@@ -1043,14 +1107,14 @@ void testPerformanceWithExpiredKeys(BucketLayout 
bucketLayout)
       long initialKeyDeleted = metrics.getNumKeyDeleted().value();
       long initialDirIterated = metrics.getNumDirIterated().value();
       long initialDirDeleted = metrics.getNumDirDeleted().value();
-      int keyCount = 100;
+      final int keyCount = 10;
       // create keys
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, keyCount, 1, 
prefix, null);
       // check there are keys in keyTable
-      Thread.sleep(SERVICE_INTERVAL);
       assertEquals(keyCount, keyList.size());
-      assertEquals(keyCount, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == keyCount,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -1066,25 +1130,26 @@ void testPerformanceWithExpiredKeys(BucketLayout 
bucketLayout)
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, ruleList);
 
       GenericTestUtils.waitFor(
-          () -> (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount, 
SERVICE_INTERVAL, 60000);
+          () -> (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
-      assertEquals(keyCount, metrics.getNumKeyDeleted().value() - 
initialKeyDeleted);
+      GenericTestUtils.waitFor(() -> metrics.getNumKeyDeleted().value() - 
initialKeyDeleted == keyCount,
+          SERVICE_INTERVAL, 5000);
       assertEquals(0, metrics.getNumDirIterated().value() - 
initialDirIterated);
       assertEquals(0, metrics.getNumDirDeleted().value() - initialDirDeleted);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
-    public Stream<Arguments> parameters4() {
+    public Stream<Arguments> parameters5() {
       return Stream.of(
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED, 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED,
+          arguments(FILE_SYSTEM_OPTIMIZED, 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
+          arguments(FILE_SYSTEM_OPTIMIZED,
               
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/"),
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED,
+          arguments(FILE_SYSTEM_OPTIMIZED,
               
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED,
+          arguments(FILE_SYSTEM_OPTIMIZED,
               
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
                   "dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/"),
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED,
+          arguments(FILE_SYSTEM_OPTIMIZED,
               
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
                   
"dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
           arguments(BucketLayout.OBJECT_STORE, 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
@@ -1096,40 +1161,29 @@ public Stream<Arguments> parameters4() {
               
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
                   "dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/"),
           arguments(BucketLayout.OBJECT_STORE,
-              
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
-                  
"dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
-          arguments(BucketLayout.LEGACY, 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
-          arguments(BucketLayout.LEGACY,
-              
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/"),
-          arguments(BucketLayout.LEGACY,
-              
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/"),
-          arguments(BucketLayout.LEGACY,
-              
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
-                  "dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/"),
-          arguments(BucketLayout.LEGACY,
               
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
                   
"dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/")
       );
     }
 
     /**
-    * ozone.om.ratis.log.appender.queue.byte-limit default is 32MB.
-
-    * size 5900049 for 100000 keys like 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/*" (40 bytes path)
-    * size 8400049 for 100000 keys like
-     *        
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/*" 
(60 bytes path)
-    * size 11000049 for 100000 keys like
-     *       
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/*"
-     *       (80 bytes path)
-    * size 13600049 for 100000 keys like
-     *       
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
-    *        "dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" (100 bytes 
path)
-    * size 16200049 for 100000 keys like
-     *       
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
-    *        
"dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/" 
(120 bytes path)
-    */
+     * ozone.om.ratis.log.appender.queue.byte-limit default is 32MB.
+     * <p>
+     * size 5900049 for 100000 keys like 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/*" (40 bytes path)
+     * size 8400049 for 100000 keys like
+     * 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/*" 
(60 bytes path)
+     * size 11000049 for 100000 keys like
+     * 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/*"
+     * (80 bytes path)
+     * size 13600049 for 100000 keys like
+     * 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
+     * "dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" (100 bytes path)
+     * size 16200049 for 100000 keys like
+     * 
"dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/" +
+     * 
"dir6/dir7/dir8/dir9/dir10/dir1/dir2/dir3/dir4/dir5/dir6/dir7/dir8/dir9/dir10/" 
(120 bytes path)
+     */
     @ParameterizedTest
-    @MethodSource("parameters4")
+    @MethodSource("parameters5")
     void testPerformanceWithNestedDir(BucketLayout bucketLayout, String prefix)
         throws IOException, InterruptedException, TimeoutException {
       final String volumeName = getTestName();
@@ -1139,14 +1193,15 @@ void testPerformanceWithNestedDir(BucketLayout 
bucketLayout, String prefix)
       long initialKeyDeleted = metrics.getNumKeyDeleted().value();
       long initialDirIterated = metrics.getNumDirIterated().value();
       long initialDirDeleted = metrics.getNumDirDeleted().value();
-      int keyCount = 1000;
+      final int keyCount = 20;
       // create keys
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, keyCount, 1, 
prefix, null);
       // check there are keys in keyTable
       Thread.sleep(SERVICE_INTERVAL);
       assertEquals(keyCount, keyList.size());
-      assertEquals(keyCount, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == keyCount,
+          WAIT_CHECK_INTERVAL, 1000);
       // create Lifecycle configuration
       ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
@@ -1162,11 +1217,51 @@ void testPerformanceWithNestedDir(BucketLayout 
bucketLayout, String prefix)
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, ruleList);
 
       GenericTestUtils.waitFor(
-          () -> (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount, 
SERVICE_INTERVAL, 10000);
+          () -> (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount, 
WAIT_CHECK_INTERVAL, 10000);
       assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
-      assertEquals(keyCount, metrics.getNumKeyDeleted().value() - 
initialKeyDeleted);
+      GenericTestUtils.waitFor(() -> metrics.getNumKeyDeleted().value() - 
initialKeyDeleted == keyCount,
+          WAIT_CHECK_INTERVAL, 5000);
       assertEquals(0, metrics.getNumDirIterated().value() - 
initialDirIterated);
-      assertEquals(0, metrics.getNumDirDeleted().value() - initialDirDeleted);
+      assertEquals(bucketLayout == FILE_SYSTEM_OPTIMIZED ? 1 : 0,
+          metrics.getNumDirDeleted().value() - initialDirDeleted);
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
+    void testListMaxSize(BucketLayout bucketLayout) throws IOException,
+        TimeoutException, InterruptedException {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      String prefix = "key";
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialKeyCount = getKeyCount(bucketLayout);
+      final int keyCount = 500;
+      keyLifecycleService.setListMaxSize(100);
+      // create keys
+      List<OmKeyArgs> keyList =
+          createKeys(volumeName, bucketName, bucketLayout, keyCount, 1, 
prefix, null);
+      // check there are keys in keyTable
+      Thread.sleep(SERVICE_INTERVAL);
+      assertEquals(keyCount, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == keyCount,
+          WAIT_CHECK_INTERVAL, 1000);
+
+      GenericTestUtils.setLogLevel(KeyLifecycleService.getLog(), Level.DEBUG);
+      GenericTestUtils.LogCapturer log =
+          GenericTestUtils.LogCapturer.captureLogs(
+              LoggerFactory.getLogger(KeyLifecycleService.class));
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
+
+      GenericTestUtils.waitFor(() ->
+          (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount, 
WAIT_CHECK_INTERVAL, 5000);
+      assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
+      GenericTestUtils.waitFor(() ->
+          log.getOutput().contains("LimitedSizeList has reached maximum size " 
+ 100), WAIT_CHECK_INTERVAL, 5000);
+      GenericTestUtils.setLogLevel(KeyLifecycleService.getLog(), Level.INFO);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
   }
@@ -1216,7 +1311,7 @@ void cleanup() {
      * owner change doesn't have impact, only the bucket deletion.
      */
     @ParameterizedTest
-    @EnumSource(BucketLayout.class)
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
     void testBucketDeleted(BucketLayout bucketLayout) throws IOException, 
InterruptedException {
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
@@ -1253,12 +1348,10 @@ void testBucketDeleted(BucketLayout bucketLayout) 
throws IOException, Interrupte
 
     public Stream<Arguments> parameters1() {
       return Stream.of(
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED, true),
-          arguments(BucketLayout.FILE_SYSTEM_OPTIMIZED, false),
+          arguments(FILE_SYSTEM_OPTIMIZED, true),
+          arguments(FILE_SYSTEM_OPTIMIZED, false),
           arguments(BucketLayout.OBJECT_STORE, true),
-          arguments(BucketLayout.OBJECT_STORE, false),
-          arguments(BucketLayout.LEGACY, true),
-          arguments(BucketLayout.LEGACY, false)
+          arguments(BucketLayout.OBJECT_STORE, false)
       );
     }
 
@@ -1293,7 +1386,7 @@ void testKeyDeletedOrRenamed(BucketLayout bucketLayout, 
boolean deleted)
 
       GenericTestUtils.waitFor(
           () -> log.getOutput().contains(KEY_COUNT + " expired keys and 0 
expired dirs found"),
-          SERVICE_INTERVAL, 10000);
+          WAIT_CHECK_INTERVAL, 10000);
       OmKeyArgs key = keyList.get(ThreadLocalRandom.current().nextInt(1, 
keyList.size()));
       // delete/rename another key before send deletion requests
       if (deleted) {
@@ -1307,13 +1400,13 @@ void testKeyDeletedOrRenamed(BucketLayout bucketLayout, 
boolean deleted)
       KeyLifecycleService.getInjector(1).resume();
       String expectedString = "Received a request to delete a Key does not 
exist /" + key.getVolumeName() + "/" +
           key.getBucketName() + "/" + key.getKeyName();
-      GenericTestUtils.waitFor(() -> 
requestLog.getOutput().contains(expectedString), SERVICE_INTERVAL, 10000);
+      GenericTestUtils.waitFor(() -> 
requestLog.getOutput().contains(expectedString), WAIT_CHECK_INTERVAL, 10000);
       if (!deleted) {
         // Since expiration action is an absolute timestamp, so the renamed 
key will expire in next evaluation task
         GenericTestUtils.waitFor(() -> log.getOutput().contains("1 expired 
keys and 0 expired dirs found"),
             SERVICE_INTERVAL, 10000);
       }
-      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == 0, SERVICE_INTERVAL, 10000);
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == 0, WAIT_CHECK_INTERVAL, 10000);
       assertEquals(KEY_COUNT, getDeletedKeyCount() - initialDeletedKeyCount);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
@@ -1354,13 +1447,13 @@ void testUnsupportedPrefixForFSO(String prefix, boolean 
createPrefix) {
       if (createPrefix) {
         omException = assertThrows(
             OMException.class,
-            () -> createLifecyclePolicy(volumeName, bucketName, 
BucketLayout.FILE_SYSTEM_OPTIMIZED,
+            () -> createLifecyclePolicy(volumeName, bucketName, 
FILE_SYSTEM_OPTIMIZED,
                 prefix, null, date.toString(), true));
       } else {
         OmLCFilter.Builder filter = getOmLCFilterBuilder(prefix, null, null);
         omException = assertThrows(
             OMException.class,
-            () -> createLifecyclePolicy(volumeName, bucketName, 
BucketLayout.FILE_SYSTEM_OPTIMIZED,
+            () -> createLifecyclePolicy(volumeName, bucketName, 
FILE_SYSTEM_OPTIMIZED,
                 null, filter.build(), date.toString(), true));
       }
       assertSame(INVALID_REQUEST, omException.getResult());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to