This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-8342
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-8342 by this push:
     new d7077ea0ce6 HDDS-12789. Support move expired key to trash if trash is 
enabled (#8974)
d7077ea0ce6 is described below

commit d7077ea0ce639c61e675a8795250320b0d7659cb
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Sep 16 02:29:26 2025 +0800

    HDDS-12789. Support move expired key to trash if trash is enabled (#8974)
---
 .../apache/hadoop/fs/ozone/OzoneTrashPolicy.java   |   2 +-
 .../src/main/proto/OmClientProtocol.proto          |   1 +
 .../file/OMDirectoryCreateRequestWithFSO.java      |   4 +-
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |   8 +
 .../ozone/om/service/KeyLifecycleService.java      | 337 +++++++++++++++++----
 .../om/service/KeyLifecycleServiceMetrics.java     |  25 +-
 .../ozone/om/service/TestKeyLifecycleService.java  | 186 +++++++++++-
 7 files changed, 482 insertions(+), 81 deletions(-)

diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
index f800d76b578..ffcde6e65a0 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
@@ -46,7 +46,7 @@ public class OzoneTrashPolicy extends TrashPolicyDefault {
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneTrashPolicy.class);
 
-  protected static final Path CURRENT = new Path("Current");
+  public static final Path CURRENT = new Path("Current");
 
   protected static final int MSECS_PER_MINUTE = 60 * 1000;
 
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 1a83bf4ff58..027f03a4ad7 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1352,6 +1352,7 @@ message RenameKeysResponse{
 message RenameKeyRequest{
     required KeyArgs keyArgs = 1;
     required string toKeyName = 2;
+    optional uint64 updateID = 3;
 }
 
 message RenameKeyResponse{
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
index 3ad7cbd17fe..9ebcccf460d 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
@@ -210,13 +210,13 @@ private void logResult(CreateDirectoryRequest 
createDirectoryRequest,
       break;
     case DIRECTORY_ALREADY_EXISTS:
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Directory already exists. Volume:{}, Bucket:{}, Key{}",
+        LOG.debug("Directory already exists. Volume:{}, Bucket:{}, Key:{}",
             volumeName, bucketName, keyName, exception);
       }
       break;
     case FAILURE:
       omMetrics.incNumCreateDirectoryFails();
-      LOG.error("Directory creation failed. Volume:{}, Bucket:{}, Key{}. " +
+      LOG.error("Directory creation failed. Volume:{}, Bucket:{}, Key:{}. " +
           "Exception:{}", volumeName, bucketName, keyName, exception);
       break;
     default:
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
index cbab201b55e..0e8fd17e7bb 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
@@ -121,6 +121,14 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
         throw new OMException("Key not found " + fromKeyName, KEY_NOT_FOUND);
       }
 
+      if (renameKeyRequest.hasUpdateID()) {
+        if (fromKeyFileStatus.getKeyInfo().getUpdateID() != 
renameKeyRequest.getUpdateID()) {
+          throw new OMException("UpdateID does not match. Key: " + fromKeyName 
+
+              ", Expected UpdateID: " + 
fromKeyFileStatus.getKeyInfo().getUpdateID() +
+              ", Given UpdateID: " + renameKeyRequest.getUpdateID(), 
OMException.ResultCodes.UPDATE_ID_NOT_MATCH);
+        }
+      }
+
       if (fromKeyFileStatus.getKeyInfo().isHsync()) {
         throw new OMException("Open file cannot be renamed since it is " +
             "hsync'ed: volumeName=" + volumeName + ", bucketName=" +
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
index 80d267cc84f..09b0c55efee 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
@@ -17,17 +17,21 @@
 
 package org.apache.hadoop.ozone.om.service;
 
+import static org.apache.hadoop.fs.FileSystem.TRASH_PREFIX;
+import static org.apache.hadoop.fs.ozone.OzoneTrashPolicy.CURRENT;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -36,7 +40,6 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -58,15 +61,22 @@
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmLCRule;
 import org.apache.hadoop.ozone.om.helpers.OmLifecycleConfiguration;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyArgs;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyError;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.protocol.ClientId;
 import org.slf4j.Logger;
@@ -214,6 +224,8 @@ public final class LifecycleActionTask implements 
BackgroundTask {
     private long numDirDeleted = 0;
     private long numKeyDeleted = 0;
     private long sizeKeyDeleted = 0;
+    private long numKeyRenamed = 0;
+    private long sizeKeyRenamed = 0;
 
     public LifecycleActionTask(OmLifecycleConfiguration lcConfig) {
       this.policy = lcConfig;
@@ -293,23 +305,23 @@ public BackgroundTaskResult call() {
         }
 
         if (expiredKeyList.isEmpty() && expiredDirList.isEmpty()) {
-          LOG.info("No expired keys/dirs found for bucket {}", bucketKey);
+          LOG.info("No expired keys/dirs found/remained for bucket {}", 
bucketKey);
           onSuccess(bucketKey);
           return result;
         }
 
-        LOG.info("{} expired keys and {} expired dirs found for bucket {}",
+        LOG.info("{} expired keys and {} expired dirs found and remained for 
bucket {}",
             expiredKeyList.size(), expiredDirList.size(), bucketKey);
 
         // If trash is enabled, move files to trash, instead of send delete 
requests.
         // OBS bucket doesn't support trash.
-        if (bucket.getBucketLayout() == BucketLayout.OBJECT_STORE) {
+        if (bucket.getBucketLayout() == OBJECT_STORE) {
           sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(),
               expiredKeyList, false);
         } else if (ozoneTrash != null) {
           // move keys to trash
-          // TODO: add unit test in next patch
-          moveKeysToTrash(bucket.getVolumeName(), bucket.getBucketName(), 
expiredKeyList);
+          // TODO: move directory to trash in next patch
+          moveKeysToTrash(bucket, expiredKeyList);
         } else {
           sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), expiredKeyList, false);
           if (!expiredDirList.isEmpty()) {
@@ -327,6 +339,9 @@ public BackgroundTaskResult call() {
     private void evaluateFSOBucket(OmVolumeArgs volume, OmBucketInfo bucket, 
String bucketKey,
         Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
         LimitedExpiredObjectList expiredKeyList, LimitedExpiredObjectList 
expiredDirList) {
+      List<OmLCRule> prefixStartsWithTrashRuleList =
+          ruleList.stream().filter(r -> r.isPrefixEnable() && 
r.getEffectivePrefix().startsWith(
+              TRASH_PREFIX + 
OzoneConsts.OM_KEY_PREFIX)).collect(Collectors.toList());
       List<OmLCRule> directoryStylePrefixRuleList =
           ruleList.stream().filter(r -> 
r.isDirectoryStylePrefix()).collect(Collectors.toList());
       List<OmLCRule> nonDirectoryStylePrefixRuleList =
@@ -335,6 +350,11 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
       List<OmLCRule> noPrefixRuleList =
           ruleList.stream().filter(r -> 
!r.isPrefixEnable()).collect(Collectors.toList());
 
+      directoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
+      nonDirectoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
+      prefixStartsWithTrashRuleList.stream().forEach(
+          r -> LOG.info("Skip rule {} as its prefix starts with {}", r, 
TRASH_PREFIX + OzoneConsts.OM_KEY_PREFIX));
+
       Table<String, OmDirectoryInfo> directoryInfoTable = 
omMetadataManager.getDirectoryTable();
       for (OmLCRule rule : directoryStylePrefixRuleList) {
         // find KeyInfo of each directory for prefix
@@ -360,11 +380,10 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
               directoryPath.toString().equals(rule.getEffectivePrefix() + 
OM_KEY_PREFIX))
               && rule.match(lastDir, directoryPath.toString())) {
             if (expiredDirList.isFull()) {
-              // if expiredDirList is full, send delete request for pending 
deletion directories
-              sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
-                  expiredDirList, true);
+              // if expiredDirList is full, send delete/rename request for 
expired directories
+              handleAndClearFullList(bucket, expiredDirList, true);
             }
-            expiredDirList.add(directoryPath.toString(), 
lastDir.getUpdateID());
+            expiredDirList.add(directoryPath.toString(), 0, 
lastDir.getUpdateID());
           }
         }
 
@@ -380,14 +399,18 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
         String prefix = OM_KEY_PREFIX + volume.getObjectID() +
             OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
         if (dirInfo != null) {
-          prefix += dirInfo.getObjectID();
-          if (dirInfo.getName().equals(rule.getEffectivePrefix()) && 
rule.match(dirInfo, dirInfo.getName())) {
-            if (expiredDirList.isFull()) {
-              // if expiredDirList is full, send delete request for pending 
deletion directories
-              sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
-                  expiredDirList, true);
+          if (!dirInfo.getName().equals(TRASH_PREFIX)) {
+            prefix += dirInfo.getObjectID();
+            if (dirInfo.getName().equals(rule.getEffectivePrefix()) && 
rule.match(dirInfo, dirInfo.getName())) {
+              if (expiredDirList.isFull()) {
+                // if expiredDirList is full, send delete/rename request for 
expired directories
+                handleAndClearFullList(bucket, expiredDirList, true);
+              }
+              expiredDirList.add(dirInfo.getName(), 0, dirInfo.getUpdateID());
             }
-            expiredDirList.add(dirInfo.getName(), dirInfo.getUpdateID());
+          } else {
+            dirInfo = null;
+            LOG.info("Skip evaluate trash directory {}", TRASH_PREFIX);
           }
         }
         LOG.info("Prefix {} for {}", prefix, bucketKey);
@@ -412,12 +435,10 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
               if (rule.match(key)) {
                 // mark key as expired, check next key
                 if (expiredKeyList.isFull()) {
-                  // if expiredKeyList is full, send delete request for 
pending deletion keys
-                  sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
-                      expiredKeyList, false);
+                  // if expiredKeyList is full, send delete/rename request for 
expired keys
+                  handleAndClearFullList(bucket, expiredKeyList, false);
                 }
-                expiredKeyList.add(key.getKeyName(), key.getUpdateID());
-                sizeKeyDeleted += key.getReplicatedSize();
+                expiredKeyList.add(key.getKeyName(), key.getReplicatedSize(), 
key.getUpdateID());
                 break;
               }
             }
@@ -433,15 +454,18 @@ private void evaluateFSOBucket(OmVolumeArgs volume, 
OmBucketInfo bucket, String
             Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
             OmDirectoryInfo dir = entry.getValue();
             numDirIterated++;
+            // skip TRASH_PREFIX directory
+            if (dir.getName().equals(TRASH_PREFIX)) {
+              continue;
+            }
             for (OmLCRule rule : noPrefixRuleList) {
               if (rule.match(dir, dir.getPath())) {
                 // mark directory as expired, check next directory
                 if (expiredDirList.isFull()) {
-                  // if expiredDirList is full, send delete request for 
pending deletion directories
-                  sendDeleteKeysRequestAndClearList(volume.getVolume(), 
bucket.getBucketName(),
-                      expiredDirList, true);
+                  // if expiredDirList is full, send delete/rename request for 
expired directories
+                  handleAndClearFullList(bucket, expiredDirList, true);
                 }
-                expiredDirList.add(dir.getPath(), dir.getUpdateID());
+                expiredDirList.add(dir.getPath(), 0, dir.getUpdateID());
                 break;
               }
             }
@@ -467,11 +491,10 @@ private void evaluateKeyTable(Table<String, OmKeyInfo> 
keyTable, String prefix,
           if (rule.match(key, keyPath)) {
             // mark key as expired, check next key
             if (keyList.isFull()) {
-              // if keyList is full, send delete request for pending deletion 
keys
-              sendDeleteKeysRequestAndClearList(volumeName, bucketName, 
keyList, false);
+              // if keyList is full, send delete/rename request for expired 
keys
+              handleAndClearFullList(bucket, keyList, false);
             }
-            keyList.add(keyPath, key.getUpdateID());
-            sizeKeyDeleted += key.getReplicatedSize();
+            keyList.add(keyPath, key.getReplicatedSize(), key.getUpdateID());
           }
         }
       } catch (IOException e) {
@@ -489,15 +512,19 @@ private void evaluateDirTable(Table<String, 
OmDirectoryInfo> directoryInfoTable,
         while (dirTblItr.hasNext()) {
           Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
           OmDirectoryInfo dir = entry.getValue();
-          String dirPath = directoryPath + dir.getName();
           numDirIterated++;
+          // skip TRASH_PREFIX directory
+          if (dir.getName().equals(TRASH_PREFIX)) {
+            continue;
+          }
+          String dirPath = directoryPath + dir.getName();
           if (rule.match(dir, dirPath)) {
             // mark dir as expired, check next key
             if (dirList.isFull()) {
-              // if dirList is full, send delete request for pending deletion 
directories
-              sendDeleteKeysRequestAndClearList(volumeName, bucketName, 
dirList, true);
+              // if dirList is full, send delete/rename request for expired 
directories
+              handleAndClearFullList(bucket, dirList, true);
             }
-            dirList.add(dirPath, dir.getUpdateID());
+            dirList.add(dirPath, 0, dir.getUpdateID());
           }
         }
       } catch (IOException e) {
@@ -510,6 +537,16 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
         Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList, 
LimitedExpiredObjectList expiredKeyList) {
       String volumeName = bucketInfo.getVolumeName();
       String bucketName = bucketInfo.getBucketName();
+
+      if (bucketInfo.getBucketLayout() == BucketLayout.LEGACY) {
+        List<OmLCRule> prefixStartsWithTrashRuleList =
+            ruleList.stream().filter(r -> r.isPrefixEnable() && 
r.getEffectivePrefix().startsWith(
+                TRASH_PREFIX + 
OzoneConsts.OM_KEY_PREFIX)).collect(Collectors.toList());
+        ruleList.removeAll(prefixStartsWithTrashRuleList);
+        prefixStartsWithTrashRuleList.stream().forEach(
+            r -> LOG.info("Skip rule {} as its prefix starts with {}", r, 
TRASH_PREFIX + OzoneConsts.OM_KEY_PREFIX));
+      }
+
       // use bucket name as key iterator prefix
       try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>> 
keyTblItr =
                keyTable.iterator(omMetadataManager.getBucketKey(volumeName, 
bucketName))) {
@@ -517,15 +554,19 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
           Table.KeyValue<String, OmKeyInfo> keyValue = keyTblItr.next();
           OmKeyInfo key = keyValue.getValue();
           numKeyIterated++;
+          if (bucketInfo.getBucketLayout() == BucketLayout.LEGACY &&
+              key.getKeyName().startsWith(TRASH_PREFIX + 
OzoneConsts.OM_KEY_PREFIX)) {
+            LOG.info("Skip evaluate trash directory {} and all its child files 
and sub directories", TRASH_PREFIX);
+            continue;
+          }
           for (OmLCRule rule : ruleList) {
             if (rule.match(key)) {
               // mark key as expired, check next key
               if (expiredKeyList.isFull()) {
-                // if expiredKeyList is full, send delete request for pending 
deletion keys
-                sendDeleteKeysRequestAndClearList(volumeName, bucketName, 
expiredKeyList, false);
+                // if expiredKeyList is full, send delete/rename request for 
expired keys
+                handleAndClearFullList(bucketInfo, expiredKeyList, false);
               }
-              expiredKeyList.add(key.getKeyName(), key.getUpdateID());
-              sizeKeyDeleted += key.getReplicatedSize();
+              expiredKeyList.add(key.getKeyName(), key.getReplicatedSize(), 
key.getUpdateID());
               break;
             }
           }
@@ -551,7 +592,7 @@ private OmDirectoryInfo getDirectory(OmVolumeArgs volume, 
OmBucketInfo bucket, S
      * If prefix is /dir1/dir2, but dir1 doesn't exist, then it will return 
exception.
      * If prefix is /dir1/dir2, but dir2 doesn't exist, then it will return a 
list with dir1 only.
      * If prefix is /dir1/dir2, although dir1 exists, but get(dir1) failed 
with IOException,
-     *   then it will return exception too.
+     * then it will return exception too.
      */
     private List<OmDirectoryInfo> getDirList(OmVolumeArgs volume, OmBucketInfo 
bucket, String prefix, String bucketKey)
         throws IOException {
@@ -588,6 +629,8 @@ private List<OmDirectoryInfo> getDirList(OmVolumeArgs 
volume, OmBucketInfo bucke
     private void onFailure(String bucketName) {
       inFlight.remove(bucketName);
       metrics.incrNumFailureTask();
+      metrics.incNumKeyIterated(numKeyIterated);
+      metrics.incNumDirIterated(numDirIterated);
     }
 
     private void onSuccess(String bucketName) {
@@ -597,9 +640,17 @@ private void onSuccess(String bucketName) {
       metrics.incTaskLatencyMs(timeSpent);
       metrics.incNumKeyIterated(numKeyIterated);
       metrics.incNumDirIterated(numDirIterated);
-      metrics.incrSizeKeyDeleted(sizeKeyDeleted);
-      LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs, 
deleted {} keys with {} bytes, and {} dirs",
-          timeSpent, bucketName, numKeyIterated, numDirIterated, 
numKeyDeleted, sizeKeyDeleted, numDirDeleted);
+      LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs, 
deleted {} keys with {} bytes, " +
+          "and {} dirs, renamed {} keys with {} bytes to trash", timeSpent, 
bucketName, numKeyIterated, numDirIterated,
+          numKeyDeleted, sizeKeyDeleted, numDirDeleted, numKeyRenamed, 
sizeKeyRenamed);
+    }
+
+    private void handleAndClearFullList(OmBucketInfo bucket, 
LimitedExpiredObjectList keysList, boolean dir) {
+      if (bucket.getBucketLayout() != OBJECT_STORE && ozoneTrash != null) {
+        moveKeysToTrash(bucket, keysList);
+      } else {
+        sendDeleteKeysRequestAndClearList(bucket.getVolumeName(), 
bucket.getBucketName(), keysList, dir);
+      }
     }
 
     private void sendDeleteKeysRequestAndClearList(String volume, String 
bucket,
@@ -641,46 +692,186 @@ private void sendDeleteKeysRequestAndClearList(String 
volume, String bucket,
                 getOzoneManager(), omRequest, clientId, 
callId.getAndIncrement());
             long endTime = System.nanoTime();
             LOG.debug("DeleteKeys request with {} keys cost {} ns", keyCount, 
endTime - startTime);
-            i += batchSize;
-            startIndex += batchSize;
+            long deletedCount = keyCount;
+            long deletedSize = keysList.replicatedSizeSubList(startIndex, 
endIndex)
+                .stream().mapToLong(Long::longValue).sum();
             if (response != null) {
               if (!response.getSuccess()) {
                 // log the failure and continue the iterating
-                LOG.error("DeleteKeys request failed with " + 
response.getMessage() +
-                    " for volume: {}, bucket: {}", volume, bucket);
-                continue;
+                LOG.error("DeleteKeys request " + response.getStatus() + " 
failed for volume: {}, bucket: {}",
+                    volume, bucket);
+                if (response.getDeleteKeysResponse().hasUnDeletedKeys()) {
+                  DeleteKeyArgs unDeletedKeys = 
response.getDeleteKeysResponse().getUnDeletedKeys();
+                  for (String key : unDeletedKeys.getKeysList()) {
+                    Long size = keysList.getReplicatedSize(key);
+                    if (size == null) {
+                      LOG.error("Undeleted key {}/{}/{} doesn't in keyLists", 
volume, bucket, key);
+                      continue;
+                    }
+                    deletedCount -= 1;
+                    deletedSize -= size;
+                  }
+                }
+                for (DeleteKeyError e : 
response.getDeleteKeysResponse().getErrorsList()) {
+                  Long size = keysList.getReplicatedSize(e.getKey());
+                  if (size == null) {
+                    LOG.error("Deleted error key {}/{}/{} doesn't in 
keyLists", volume, bucket, e.getKey());
+                    continue;
+                  }
+                  deletedCount -= 1;
+                  deletedSize -= size;
+                }
               } else {
                 LOG.debug("DeleteKeys request of total {} keys, {} not 
deleted", keyCount,
                     response.getDeleteKeysResponse().getErrorsCount());
               }
             }
             if (dir) {
-              numDirDeleted += keyCount;
-              metrics.incrNumDirDeleted(keyCount);
+              numDirDeleted += deletedCount;
+              metrics.incrNumDirDeleted(deletedCount);
             } else {
-              numKeyDeleted += keyCount;
-              metrics.incrNumKeyDeleted(keyCount);
+              numKeyDeleted += deletedCount;
+              sizeKeyDeleted += deletedSize;
+              metrics.incrNumKeyDeleted(deletedCount);
+              metrics.incrSizeKeyDeleted(deletedSize);
             }
+            i += keyCount;
+            startIndex += keyCount;
           } else {
             batchSize /= 2;
           }
         }
-        keysList.clear();
       } catch (ServiceException e) {
         LOG.error("Failed to send DeleteKeysRequest", e);
+      } finally {
+        keysList.clear();
       }
     }
 
-    private void moveKeysToTrash(String volume, String bucket, 
LimitedExpiredObjectList keysList) {
-      for (int index = 0; index < keysList.size(); index++) {
+    private void moveKeysToTrash(OmBucketInfo bucket, LimitedExpiredObjectList 
keysList) {
+      if (keysList.isEmpty()) {
+        return;
+      }
+      String volumeName = bucket.getVolumeName();
+      String bucketName = bucket.getBucketName();
+      String trashCurrent;
+      UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(bucket.getOwner());
+      try {
+        trashCurrent = checkAndCreateTrashDirectoryIfNeeded(bucket, ugi);
+      } catch (IOException e) {
+        keysList.clear();
+        return;
+      }
+
+      for (int i = 0; i < keysList.size(); i++) {
+        String keyName = keysList.getName(i);
+        String targetKeyName = trashCurrent + OM_KEY_PREFIX + keyName;
+        KeyArgs keyArgs = KeyArgs.newBuilder().setKeyName(keyName)
+            .setVolumeName(volumeName).setBucketName(bucketName).build();
+
+        /**
+         * Trash examples:
+         * /s3v/test/readme ->  /s3v/test/.Trash/hadoop/Current/readme
+         * /s3v/test/dir1/readme -> /s3v/test/.Trash/hadoop/Current/dir1/readme
+         */
+        RenameKeyRequest renameKeyRequest = RenameKeyRequest.newBuilder()
+            .setKeyArgs(keyArgs)
+            .setToKeyName(targetKeyName)
+            .setUpdateID(keysList.getUpdateID(i))
+            .build();
+
+        // send request out
+        OMRequest omRequest = OMRequest.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.RenameKey)
+            .setVersion(ClientVersion.CURRENT_VERSION)
+            .setClientId(clientId.toString())
+            .setRenameKeyRequest(renameKeyRequest)
+            .build();
         try {
-          ozoneTrash.moveToTrash(new Path(OM_KEY_PREFIX + volume + 
OM_KEY_PREFIX + bucket + OM_KEY_PREFIX +
-              keysList.getName(index)));
-        } catch (IOException e) {
-          // log failure and continue
-          LOG.warn("Failed to move key {} to trash", keysList.getName(index), 
e);
+          // perform preExecute as ratis submit do no perform preExecute
+          OMClientRequest omClientRequest = 
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+          OzoneManagerProtocolProtos.OMResponse omResponse =
+              ugi.doAs(new 
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
+                @Override
+                public OzoneManagerProtocolProtos.OMResponse run() throws 
Exception {
+                  OMRequest request = omClientRequest.preExecute(ozoneManager);
+                  return 
OzoneManagerRatisUtils.submitRequest(getOzoneManager(),
+                      request, clientId, callId.getAndIncrement());
+                }
+              });
+          if (omResponse != null) {
+            if (!omResponse.getSuccess()) {
+              // log the failure and continue the iterating
+              LOG.error("RenameKey request failed with source key: {}, dest 
key: {}", keyName, targetKeyName);
+              continue;
+            }
+          }
+          LOG.debug("RenameKey request succeed with source key: {}, dest key: 
{}", keyName, targetKeyName);
+          numKeyRenamed += 1;
+          sizeKeyRenamed += keysList.getReplicatedSize(i);
+          metrics.incrNumKeyRenamed(1);
+          metrics.incrSizeKeyRenamed(keysList.getReplicatedSize(i));
+        } catch (InterruptedException | IOException e) {
+          LOG.error("Failed to send RenameKeysRequest", e);
         }
       }
+      keysList.clear();
+    }
+
+    private String checkAndCreateTrashDirectoryIfNeeded(OmBucketInfo bucket, 
UserGroupInformation ugi)
+        throws IOException {
+      String userTrashRoot = TRASH_PREFIX + OM_KEY_PREFIX + bucket.getOwner();
+      String userTrashCurrent = userTrashRoot + OM_KEY_PREFIX + CURRENT;
+      try {
+        OmKeyArgs key = new 
OmKeyArgs.Builder().setVolumeName(bucket.getVolumeName())
+            .setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
+            .setOwnerName(bucket.getOwner()).build();
+        ozoneManager.getFileStatus(key);
+        return userTrashCurrent;
+      } catch (IOException e) {
+        if (e instanceof OMException &&
+            (((OMException) e).getResult() == 
OMException.ResultCodes.FILE_NOT_FOUND ||
+                ((OMException) e).getResult() == 
OMException.ResultCodes.DIRECTORY_NOT_FOUND)) {
+          // create the trash/Current directory for user
+          KeyArgs keyArgs = 
KeyArgs.newBuilder().setVolumeName(bucket.getVolumeName())
+              
.setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
+              .setOwnerName(bucket.getOwner()).setRecursive(true).build();
+          OMRequest omRequest = 
OMRequest.newBuilder().setCreateDirectoryRequest(
+                  CreateDirectoryRequest.newBuilder().setKeyArgs(keyArgs))
+              .setCmdType(OzoneManagerProtocolProtos.Type.CreateDirectory)
+              .setVersion(ClientVersion.CURRENT_VERSION)
+              .setClientId(clientId.toString())
+              .build();
+          try {
+            // perform preExecute as ratis submit do no perform preExecute
+            final OMClientRequest omClientRequest = 
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+            OzoneManagerProtocolProtos.OMResponse omResponse =
+                ugi.doAs(new 
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
+                  @Override
+                  public OzoneManagerProtocolProtos.OMResponse run() throws 
Exception {
+                    OMRequest request = 
omClientRequest.preExecute(ozoneManager);
+                    return 
OzoneManagerRatisUtils.submitRequest(getOzoneManager(),
+                        request, clientId, callId.getAndIncrement());
+                  }
+                });
+
+            if (omResponse != null) {
+              if (!omResponse.getSuccess()) {
+                LOG.error("CreateDirectory request failed with {}, path: {}",
+                    omResponse.getMessage(), userTrashCurrent);
+                throw new IOException("Failed to create trash directory " + 
userTrashCurrent);
+              }
+              LOG.debug("Created trash current directory: {}", 
userTrashCurrent);
+              return userTrashCurrent;
+            }
+          } catch (InterruptedException | IOException e1) {
+            LOG.error("Failed to send CreateDirectoryRequest for {}", 
userTrashCurrent, e1);
+            throw new IOException("Failed to send CreateDirectoryRequest 
request for " + userTrashCurrent);
+          }
+        }
+        LOG.error("Failed to get trash current directory {} status", 
userTrashCurrent, e);
+        throw e;
+      }
     }
   }
 
@@ -704,20 +895,28 @@ public void setListMaxSize(int size) {
     this.listMaxSize = size;
   }
 
+  @VisibleForTesting
+  public void setOzoneTrash(OzoneTrash ozoneTrash) {
+    this.ozoneTrash = ozoneTrash;
+  }
+
   /**
    * An in-memory list with limited size to hold expired object infos, 
including object name and current update ID.
    */
   public static class LimitedExpiredObjectList {
     private final LimitedSizeList<String> objectNames;
+    private final List<Long> objectReplicatedSize;
     private final List<Long> objectUpdateIDs;
 
     public LimitedExpiredObjectList(int maxListSize) {
       this.objectNames = new LimitedSizeList<>(maxListSize);
+      this.objectReplicatedSize = new ArrayList<>();
       this.objectUpdateIDs = new ArrayList<>();
     }
 
-    public void add(String name, long updateID) {
+    public void add(String name, long size, long updateID) {
       objectNames.add(name);
+      objectReplicatedSize.add(size);
       objectUpdateIDs.add(updateID);
     }
 
@@ -733,9 +932,23 @@ public List<Long> updateIDSubList(int fromIndex, int 
toIndex) {
       return objectUpdateIDs.subList(fromIndex, toIndex);
     }
 
+    public List<Long> replicatedSizeSubList(int fromIndex, int toIndex) {
+      return objectReplicatedSize.subList(fromIndex, toIndex);
+    }
+
+    public Long getReplicatedSize(String keyName) {
+      for (int index = 0; index < objectNames.size(); index++) {
+        if (objectNames.get(index).equals(keyName)) {
+          return objectReplicatedSize.get(index);
+        }
+      }
+      return null;
+    }
+
     public void clear() {
       objectNames.clear();
       objectUpdateIDs.clear();
+      objectReplicatedSize.clear();
     }
 
     public boolean isEmpty() {
@@ -753,6 +966,10 @@ public String getName(int index) {
     public long getUpdateID(int index) {
       return objectUpdateIDs.get(index);
     }
+
+    public long getReplicatedSize(int index) {
+      return objectReplicatedSize.get(index);
+    }
   }
 
   /**
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
index 104be2995cc..77efe4f2287 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
@@ -70,16 +70,21 @@ public static void unregister() {
   @Metric("Execution time of a success task for a bucket")
   private MutableRate taskLatencyMs;
 
-  @Metric("Number of key iterated of a success task for a bucket")
+  // following metrics are updated by both success and failure tasks
+  @Metric("Number of key iterated")
   private MutableGaugeLong numKeyIterated;
-  @Metric("Number of dir iterated of a success task for a bucket")
+  @Metric("Number of dir iterated")
   private MutableGaugeLong numDirIterated;
   @Metric("Total directories deleted")
   private MutableGaugeLong numDirDeleted;
   @Metric("Total keys deleted")
   private MutableGaugeLong numKeyDeleted;
+  @Metric("Total keys renamed")
+  private MutableGaugeLong numKeyRenamed;
   @Metric("Total size of keys deleted")
   private MutableGaugeLong sizeKeyDeleted;
+  @Metric("Total size of keys renamed")
+  private MutableGaugeLong sizeKeyRenamed;
 
   public void incrNumSkippedTask() {
     numSkippedTask.incr();
@@ -101,10 +106,18 @@ public void incrNumKeyDeleted(long keyCount) {
     numKeyDeleted.incr(keyCount);
   }
 
+  public void incrNumKeyRenamed(long keyCount) {
+    numKeyRenamed.incr(keyCount);
+  }
+
   public void incrSizeKeyDeleted(long size) {
     sizeKeyDeleted.incr(size);
   }
 
+  public void incrSizeKeyRenamed(long size) {
+    sizeKeyRenamed.incr(size);
+  }
+
   public MutableGaugeLong getNumDirDeleted() {
     return numDirDeleted;
   }
@@ -113,10 +126,18 @@ public MutableGaugeLong getNumKeyDeleted() {
     return numKeyDeleted;
   }
 
+  public MutableGaugeLong getNumKeyRenamed() {
+    return numKeyRenamed;
+  }
+
   public MutableGaugeLong getSizeKeyDeleted() {
     return sizeKeyDeleted;
   }
 
+  public MutableGaugeLong getSizeKeyRenamed() {
+    return sizeKeyRenamed;
+  }
+
   public MutableGaugeLong getNumDirIterated() {
     return numDirIterated;
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
index 7df7e244b76..4a665a28ad0 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
@@ -17,14 +17,20 @@
 
 package org.apache.hadoop.ozone.om.service;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.FileSystem.TRASH_PREFIX;
+import static org.apache.hadoop.fs.ozone.OzoneTrashPolicy.CURRENT;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
+import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
+import static 
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -38,6 +44,7 @@
 import com.google.common.collect.ImmutableMap;
 import java.io.File;
 import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
 import java.util.ArrayList;
@@ -53,6 +60,7 @@
 import java.util.stream.Stream;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -67,7 +75,9 @@
 import org.apache.hadoop.ozone.om.OMMetadataManager;
 import org.apache.hadoop.ozone.om.OmTestManagers;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.OzoneTrash;
 import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
+import org.apache.hadoop.ozone.om.TrashOzoneFileSystem;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
@@ -92,6 +102,8 @@
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 import org.apache.hadoop.ozone.security.acl.OzoneObj;
 import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.OzoneTestBase;
 import org.apache.ratis.util.ExitUtils;
@@ -128,7 +140,7 @@ class TestKeyLifecycleService extends OzoneTestBase {
       LoggerFactory.getLogger(TestKeyLifecycleService.class);
   private static final AtomicInteger OBJECT_COUNTER = new AtomicInteger();
   private static final AtomicInteger OBJECT_ID_COUNTER = new AtomicInteger();
-  private static final int KEY_COUNT = 10;
+  private static final int KEY_COUNT = 2;
   private static final int EXPIRE_SECONDS = 2;
   private static final int SERVICE_INTERVAL = 300;
   private static final int WAIT_CHECK_INTERVAL = 50;
@@ -194,6 +206,7 @@ void setup(@TempDir File testDir) throws Exception {
 
     @AfterEach
     void resume() {
+      keyLifecycleService.setOzoneTrash(null);
     }
 
     @AfterAll
@@ -294,7 +307,8 @@ void testOnlyKeyExpired(BucketLayout bucketLayout, boolean 
createPrefix) throws
       long initialKeyCount = getKeyCount(bucketLayout);
 
       // Create the key
-      createVolumeAndBucket(volumeName, bucketName, bucketLayout, false);
+      createVolumeAndBucket(volumeName, bucketName, bucketLayout,
+          UserGroupInformation.getCurrentUser().getShortUserName());
       OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName, 
uniqueObjectName(prefix), 1, null);
 
       // create Lifecycle configuration
@@ -879,7 +893,8 @@ void testExpireOnlyDirectory(String dirName, String prefix, 
int dirDepth, int de
       long initialNumDeletedDir = metrics.getNumDirDeleted().value();
 
       // Create the directory
-      createVolumeAndBucket(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, 
false);
+      createVolumeAndBucket(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED,
+          UserGroupInformation.getCurrentUser().getShortUserName());
       createDirectory(volumeName, bucketName, dirName);
       KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, bucketName, 
dirName);
       assertFalse(keyInfo.getKeyInfo().isFile());
@@ -1038,7 +1053,7 @@ void testKeyUpdatedShouldNotGetDeleted(BucketLayout 
bucketLayout)
       KeyLifecycleService.getInjector(0).resume();
 
       GenericTestUtils.waitFor(
-          () -> log.getOutput().contains(KEY_COUNT + " expired keys and 0 
expired dirs found"),
+          () -> log.getOutput().contains(KEY_COUNT + " expired keys and 0 
expired dirs found and remained"),
           WAIT_CHECK_INTERVAL, 10000);
 
       OmKeyArgs key = 
keyList.get(ThreadLocalRandom.current().nextInt(keyList.size()));
@@ -1227,17 +1242,29 @@ void testPerformanceWithNestedDir(BucketLayout 
bucketLayout, String prefix)
       deleteLifecyclePolicy(volumeName, bucketName);
     }
 
+    public Stream<Arguments> parameters6() {
+      return Stream.of(
+          arguments("FILE_SYSTEM_OPTIMIZED", true),
+          arguments("FILE_SYSTEM_OPTIMIZED", false),
+          arguments("LEGACY", true),
+          arguments("LEGACY", false),
+          arguments("OBJECT_STORE", true),
+          arguments("OBJECT_STORE", false));
+    }
+
     @ParameterizedTest
-    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
-    void testListMaxSize(BucketLayout bucketLayout) throws IOException,
+    @MethodSource("parameters6")
+    void testListMaxSize(BucketLayout bucketLayout, boolean enableTrash) 
throws IOException,
         TimeoutException, InterruptedException {
       final String volumeName = getTestName();
       final String bucketName = uniqueObjectName("bucket");
       String prefix = "key";
       long initialDeletedKeyCount = getDeletedKeyCount();
       long initialKeyCount = getKeyCount(bucketLayout);
-      final int keyCount = 500;
-      keyLifecycleService.setListMaxSize(100);
+      long initialRenamedKeyCount = metrics.getNumKeyRenamed().value();
+      final int keyCount = 100;
+      final int maxListSize = 20;
+      keyLifecycleService.setListMaxSize(maxListSize);
       // create keys
       List<OmKeyArgs> keyList =
           createKeys(volumeName, bucketName, bucketLayout, keyCount, 1, 
prefix, null);
@@ -1247,6 +1274,15 @@ void testListMaxSize(BucketLayout bucketLayout) throws 
IOException,
       GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == keyCount,
           WAIT_CHECK_INTERVAL, 1000);
 
+      if (enableTrash) {
+        final float trashInterval = 0.5f; // 30 seconds, 0.5 * (60 * 1000) ms
+        conf.setFloat(FS_TRASH_INTERVAL_KEY, trashInterval);
+        FileSystem fs = SecurityUtil.doAsLoginUser(
+            (PrivilegedExceptionAction<FileSystem>)
+                () -> new TrashOzoneFileSystem(om));
+        keyLifecycleService.setOzoneTrash(new OzoneTrash(fs, conf, om));
+      }
+
       GenericTestUtils.setLogLevel(KeyLifecycleService.getLog(), Level.DEBUG);
       GenericTestUtils.LogCapturer log =
           GenericTestUtils.LogCapturer.captureLogs(
@@ -1256,14 +1292,124 @@ void testListMaxSize(BucketLayout bucketLayout) throws 
IOException,
       ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
       createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix, 
null, date.toString(), true);
 
+      if (enableTrash && bucketLayout != OBJECT_STORE) {
+        GenericTestUtils.waitFor(() ->
+            (metrics.getNumKeyRenamed().value() - initialRenamedKeyCount) == 
keyCount, WAIT_CHECK_INTERVAL, 5000);
+        assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
+      } else {
+        GenericTestUtils.waitFor(() ->
+            (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount, 
WAIT_CHECK_INTERVAL, 5000);
+        assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
+      }
       GenericTestUtils.waitFor(() ->
-          (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount, 
WAIT_CHECK_INTERVAL, 5000);
-      assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
-      GenericTestUtils.waitFor(() ->
-          log.getOutput().contains("LimitedSizeList has reached maximum size " 
+ 100), WAIT_CHECK_INTERVAL, 5000);
+          log.getOutput().contains("LimitedSizeList has reached maximum size " 
+ maxListSize),
+          WAIT_CHECK_INTERVAL, 5000);
       GenericTestUtils.setLogLevel(KeyLifecycleService.getLog(), Level.INFO);
       deleteLifecyclePolicy(volumeName, bucketName);
     }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "LEGACY"})
+    void testMoveToTrash(BucketLayout bucketLayout) throws IOException,
+        TimeoutException, InterruptedException {
+      final String volumeName = getTestName();
+      final String bucketName = uniqueObjectName("bucket");
+      String prefix = "key";
+      long initialDeletedKeyCount = getDeletedKeyCount();
+      long initialKeyCount = getKeyCount(bucketLayout);
+      long initialRenamedKeyCount = metrics.getNumKeyRenamed().value();
+      // create keys
+      String bucketOwner = 
UserGroupInformation.getCurrentUser().getShortUserName() + "-test";
+      List<OmKeyArgs> keyList =
+          createKeys(volumeName, bucketName, bucketLayout, bucketOwner, 
KEY_COUNT, 1, prefix, null);
+      // check there are keys in keyTable
+      Thread.sleep(SERVICE_INTERVAL);
+      assertEquals(KEY_COUNT, keyList.size());
+      GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) - 
initialKeyCount == KEY_COUNT,
+          WAIT_CHECK_INTERVAL, 1000);
+
+      // enabled trash
+      final float trashInterval = 0.5f; // 30 seconds, 0.5 * (60 * 1000) ms
+      conf.setFloat(FS_TRASH_INTERVAL_KEY, trashInterval);
+      FileSystem fs = SecurityUtil.doAsLoginUser(
+          (PrivilegedExceptionAction<FileSystem>)
+              () -> new TrashOzoneFileSystem(om));
+      keyLifecycleService.setOzoneTrash(new OzoneTrash(fs, conf, om));
+
+      // create Lifecycle configuration
+      ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+      ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, bucketLayout, "", null, 
date.toString(), true);
+
+      GenericTestUtils.waitFor(() ->
+          (metrics.getNumKeyRenamed().value() - initialRenamedKeyCount) == 
KEY_COUNT, WAIT_CHECK_INTERVAL, 50000);
+      assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
+      deleteLifecyclePolicy(volumeName, bucketName);
+      // verify trash directory has the right native ACLs
+      List<KeyInfoWithVolumeContext> dirList = new ArrayList<>();
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
+        dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX));
+        dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX + 
OM_KEY_PREFIX + bucketOwner));
+        dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX + 
OM_KEY_PREFIX + bucketOwner +
+            OM_KEY_PREFIX + CURRENT));
+      } else {
+        dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX + 
OM_KEY_PREFIX));
+        dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX + 
OM_KEY_PREFIX + bucketOwner + OM_KEY_PREFIX));
+        dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX + 
OM_KEY_PREFIX + bucketOwner +
+            OM_KEY_PREFIX + CURRENT + OM_KEY_PREFIX));
+      }
+      for (KeyInfoWithVolumeContext dir : dirList) {
+        List<OzoneAcl> aclList = dir.getKeyInfo().getAcls();
+        for (OzoneAcl acl : aclList) {
+          if (acl.getType() == IAccessAuthorizer.ACLIdentityType.USER ||
+              acl.getType() == IAccessAuthorizer.ACLIdentityType.GROUP) {
+            assertEquals(bucketOwner, acl.getName());
+            assertTrue(acl.getAclList().contains(ALL));
+          }
+        }
+      }
+
+      GenericTestUtils.LogCapturer log =
+          GenericTestUtils.LogCapturer.captureLogs(
+              LoggerFactory.getLogger(KeyLifecycleService.class));
+
+      // keys under trash directory is counted in getKeyCount()
+      if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
+        assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+      } else {
+        // For legacy bucket, trash directories along .Trash/user-test/Current 
are in key table too.
+        assertEquals(KEY_COUNT + 3, getKeyCount(bucketLayout) - 
initialKeyCount);
+      }
+      // create new policy to test rule with prefix ".Trash/" is ignored 
during lifecycle evaluation
+      now = ZonedDateTime.now(ZoneOffset.UTC);
+      date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, bucketLayout, TRASH_PREFIX 
+ OM_KEY_PREFIX,
+          null, date.toString(), true);
+
+      GenericTestUtils.waitFor(
+          () -> log.getOutput().contains("Skip rule") &&
+              log.getOutput().contains("as its prefix starts with " + 
TRASH_PREFIX + OM_KEY_PREFIX),
+          WAIT_CHECK_INTERVAL, 5000);
+      deleteLifecyclePolicy(volumeName, bucketName);
+
+      // create new policy to test rule with prefix ".Trash" is ignored during 
lifecycle evaluation
+      now = ZonedDateTime.now(ZoneOffset.UTC);
+      date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, bucketLayout, 
TRASH_PREFIX, null, date.toString(), true);
+
+      GenericTestUtils.waitFor(
+          () -> log.getOutput().contains("Skip evaluate trash directory " + 
TRASH_PREFIX), WAIT_CHECK_INTERVAL, 5000);
+      deleteLifecyclePolicy(volumeName, bucketName);
+
+      // create new policy to test trash directory is skipped during lifecycle 
evaluation
+      now = ZonedDateTime.now(ZoneOffset.UTC);
+      date = now.plusSeconds(EXPIRE_SECONDS);
+      createLifecyclePolicy(volumeName, bucketName, bucketLayout, "", null, 
date.toString(), true);
+
+      GenericTestUtils.waitFor(
+          () -> log.getOutput().contains("No expired keys/dirs found/remained 
for bucket"), WAIT_CHECK_INTERVAL, 5000);
+      deleteLifecyclePolicy(volumeName, bucketName);
+    }
   }
 
   /**
@@ -1319,7 +1465,8 @@ void testBucketDeleted(BucketLayout bucketLayout) throws 
IOException, Interrupte
       GenericTestUtils.LogCapturer log =
           GenericTestUtils.LogCapturer.captureLogs(
               LoggerFactory.getLogger(KeyLifecycleService.class));
-      createVolumeAndBucket(volumeName, bucketName, bucketLayout, false);
+      createVolumeAndBucket(volumeName, bucketName, bucketLayout,
+          UserGroupInformation.getCurrentUser().getShortUserName());
       assertNotNull(writeClient.getBucketInfo(volumeName, bucketName));
 
       // create Lifecycle configuration
@@ -1462,8 +1609,15 @@ void testUnsupportedPrefixForFSO(String prefix, boolean 
createPrefix) {
 
   private List<OmKeyArgs> createKeys(String volume, String bucket, 
BucketLayout bucketLayout,
       int keyCount, int numBlocks, String keyPrefix, Map<String, String> tags) 
throws IOException {
+    return  createKeys(volume, bucket, bucketLayout, 
UserGroupInformation.getCurrentUser().getShortUserName(),
+        keyCount, numBlocks, keyPrefix, tags);
+  }
+
+  @SuppressWarnings("parameternumber")
+  private List<OmKeyArgs> createKeys(String volume, String bucket, 
BucketLayout bucketLayout, String owner,
+      int keyCount, int numBlocks, String keyPrefix, Map<String, String> tags) 
throws IOException {
     // Create Volume and Bucket
-    createVolumeAndBucket(volume, bucket, bucketLayout, false);
+    createVolumeAndBucket(volume, bucket, bucketLayout, owner);
     List<OmKeyArgs> keyList = new ArrayList<>();
     for (int x = 0; x < keyCount; x++) {
       final String keyName = uniqueObjectName(keyPrefix);
@@ -1543,7 +1697,7 @@ private void deleteLifecyclePolicy(String volume, String 
bucket)
   }
 
   private void createVolumeAndBucket(String volumeName,
-      String bucketName, BucketLayout bucketLayout, boolean 
isVersioningEnabled) throws IOException {
+      String bucketName, BucketLayout bucketLayout, String owner) throws 
IOException {
     // cheat here, just create a volume and bucket entry so that we can
     // create the keys, we put the same data for key and value since the
     // system does not decode the object
@@ -1558,8 +1712,8 @@ private void createVolumeAndBucket(String volumeName,
     OMRequestTestUtils.addBucketToOM(keyManager.getMetadataManager(),
         OmBucketInfo.newBuilder().setVolumeName(volumeName)
             .setBucketName(bucketName)
-            .setIsVersionEnabled(isVersioningEnabled)
             .setBucketLayout(bucketLayout)
+            .setOwner(owner)
             .setObjectID(OBJECT_ID_COUNTER.incrementAndGet())
             .build());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to