This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-8342
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-8342 by this push:
new d7077ea0ce6 HDDS-12789. Support move expired key to trash if trash is
enabled (#8974)
d7077ea0ce6 is described below
commit d7077ea0ce639c61e675a8795250320b0d7659cb
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Sep 16 02:29:26 2025 +0800
HDDS-12789. Support move expired key to trash if trash is enabled (#8974)
---
.../apache/hadoop/fs/ozone/OzoneTrashPolicy.java | 2 +-
.../src/main/proto/OmClientProtocol.proto | 1 +
.../file/OMDirectoryCreateRequestWithFSO.java | 4 +-
.../om/request/key/OMKeyRenameRequestWithFSO.java | 8 +
.../ozone/om/service/KeyLifecycleService.java | 337 +++++++++++++++++----
.../om/service/KeyLifecycleServiceMetrics.java | 25 +-
.../ozone/om/service/TestKeyLifecycleService.java | 186 +++++++++++-
7 files changed, 482 insertions(+), 81 deletions(-)
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
index f800d76b578..ffcde6e65a0 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/fs/ozone/OzoneTrashPolicy.java
@@ -46,7 +46,7 @@ public class OzoneTrashPolicy extends TrashPolicyDefault {
private static final Logger LOG =
LoggerFactory.getLogger(OzoneTrashPolicy.class);
- protected static final Path CURRENT = new Path("Current");
+ public static final Path CURRENT = new Path("Current");
protected static final int MSECS_PER_MINUTE = 60 * 1000;
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 1a83bf4ff58..027f03a4ad7 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1352,6 +1352,7 @@ message RenameKeysResponse{
message RenameKeyRequest{
required KeyArgs keyArgs = 1;
required string toKeyName = 2;
+ optional uint64 updateID = 3;
}
message RenameKeyResponse{
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
index 3ad7cbd17fe..9ebcccf460d 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMDirectoryCreateRequestWithFSO.java
@@ -210,13 +210,13 @@ private void logResult(CreateDirectoryRequest
createDirectoryRequest,
break;
case DIRECTORY_ALREADY_EXISTS:
if (LOG.isDebugEnabled()) {
- LOG.debug("Directory already exists. Volume:{}, Bucket:{}, Key{}",
+ LOG.debug("Directory already exists. Volume:{}, Bucket:{}, Key:{}",
volumeName, bucketName, keyName, exception);
}
break;
case FAILURE:
omMetrics.incNumCreateDirectoryFails();
- LOG.error("Directory creation failed. Volume:{}, Bucket:{}, Key{}. " +
+ LOG.error("Directory creation failed. Volume:{}, Bucket:{}, Key:{}. " +
"Exception:{}", volumeName, bucketName, keyName, exception);
break;
default:
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
index cbab201b55e..0e8fd17e7bb 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRenameRequestWithFSO.java
@@ -121,6 +121,14 @@ public OMClientResponse
validateAndUpdateCache(OzoneManager ozoneManager, Execut
throw new OMException("Key not found " + fromKeyName, KEY_NOT_FOUND);
}
+ if (renameKeyRequest.hasUpdateID()) {
+ if (fromKeyFileStatus.getKeyInfo().getUpdateID() !=
renameKeyRequest.getUpdateID()) {
+ throw new OMException("UpdateID does not match. Key: " + fromKeyName
+
+ ", Expected UpdateID: " +
fromKeyFileStatus.getKeyInfo().getUpdateID() +
+ ", Given UpdateID: " + renameKeyRequest.getUpdateID(),
OMException.ResultCodes.UPDATE_ID_NOT_MATCH);
+ }
+ }
+
if (fromKeyFileStatus.getKeyInfo().isHsync()) {
throw new OMException("Open file cannot be renamed since it is " +
"hsync'ed: volumeName=" + volumeName + ", bucketName=" +
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
index 80d267cc84f..09b0c55efee 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
@@ -17,17 +17,21 @@
package org.apache.hadoop.ozone.om.service;
+import static org.apache.hadoop.fs.FileSystem.TRASH_PREFIX;
+import static org.apache.hadoop.fs.ozone.OzoneTrashPolicy.CURRENT;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED_DEFAULT;
+import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.nio.file.Paths;
+import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -36,7 +40,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -58,15 +61,22 @@
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmLCRule;
import org.apache.hadoop.ozone.om.helpers.OmLifecycleConfiguration;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateDirectoryRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyArgs;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeyError;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.DeleteKeysRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.KeyArgs;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RenameKeyRequest;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.ClientId;
import org.slf4j.Logger;
@@ -214,6 +224,8 @@ public final class LifecycleActionTask implements
BackgroundTask {
private long numDirDeleted = 0;
private long numKeyDeleted = 0;
private long sizeKeyDeleted = 0;
+ private long numKeyRenamed = 0;
+ private long sizeKeyRenamed = 0;
public LifecycleActionTask(OmLifecycleConfiguration lcConfig) {
this.policy = lcConfig;
@@ -293,23 +305,23 @@ public BackgroundTaskResult call() {
}
if (expiredKeyList.isEmpty() && expiredDirList.isEmpty()) {
- LOG.info("No expired keys/dirs found for bucket {}", bucketKey);
+ LOG.info("No expired keys/dirs found/remained for bucket {}",
bucketKey);
onSuccess(bucketKey);
return result;
}
- LOG.info("{} expired keys and {} expired dirs found for bucket {}",
+ LOG.info("{} expired keys and {} expired dirs found and remained for
bucket {}",
expiredKeyList.size(), expiredDirList.size(), bucketKey);
// If trash is enabled, move files to trash, instead of send delete
requests.
// OBS bucket doesn't support trash.
- if (bucket.getBucketLayout() == BucketLayout.OBJECT_STORE) {
+ if (bucket.getBucketLayout() == OBJECT_STORE) {
sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(),
expiredKeyList, false);
} else if (ozoneTrash != null) {
// move keys to trash
- // TODO: add unit test in next patch
- moveKeysToTrash(bucket.getVolumeName(), bucket.getBucketName(),
expiredKeyList);
+ // TODO: move directory to trash in next patch
+ moveKeysToTrash(bucket, expiredKeyList);
} else {
sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(), expiredKeyList, false);
if (!expiredDirList.isEmpty()) {
@@ -327,6 +339,9 @@ public BackgroundTaskResult call() {
private void evaluateFSOBucket(OmVolumeArgs volume, OmBucketInfo bucket,
String bucketKey,
Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
LimitedExpiredObjectList expiredKeyList, LimitedExpiredObjectList
expiredDirList) {
+ List<OmLCRule> prefixStartsWithTrashRuleList =
+ ruleList.stream().filter(r -> r.isPrefixEnable() &&
r.getEffectivePrefix().startsWith(
+ TRASH_PREFIX +
OzoneConsts.OM_KEY_PREFIX)).collect(Collectors.toList());
List<OmLCRule> directoryStylePrefixRuleList =
ruleList.stream().filter(r ->
r.isDirectoryStylePrefix()).collect(Collectors.toList());
List<OmLCRule> nonDirectoryStylePrefixRuleList =
@@ -335,6 +350,11 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
List<OmLCRule> noPrefixRuleList =
ruleList.stream().filter(r ->
!r.isPrefixEnable()).collect(Collectors.toList());
+ directoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
+ nonDirectoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
+ prefixStartsWithTrashRuleList.stream().forEach(
+ r -> LOG.info("Skip rule {} as its prefix starts with {}", r,
TRASH_PREFIX + OzoneConsts.OM_KEY_PREFIX));
+
Table<String, OmDirectoryInfo> directoryInfoTable =
omMetadataManager.getDirectoryTable();
for (OmLCRule rule : directoryStylePrefixRuleList) {
// find KeyInfo of each directory for prefix
@@ -360,11 +380,10 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
directoryPath.toString().equals(rule.getEffectivePrefix() +
OM_KEY_PREFIX))
&& rule.match(lastDir, directoryPath.toString())) {
if (expiredDirList.isFull()) {
- // if expiredDirList is full, send delete request for pending
deletion directories
- sendDeleteKeysRequestAndClearList(volume.getVolume(),
bucket.getBucketName(),
- expiredDirList, true);
+ // if expiredDirList is full, send delete/rename request for
expired directories
+ handleAndClearFullList(bucket, expiredDirList, true);
}
- expiredDirList.add(directoryPath.toString(),
lastDir.getUpdateID());
+ expiredDirList.add(directoryPath.toString(), 0,
lastDir.getUpdateID());
}
}
@@ -380,14 +399,18 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
String prefix = OM_KEY_PREFIX + volume.getObjectID() +
OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
if (dirInfo != null) {
- prefix += dirInfo.getObjectID();
- if (dirInfo.getName().equals(rule.getEffectivePrefix()) &&
rule.match(dirInfo, dirInfo.getName())) {
- if (expiredDirList.isFull()) {
- // if expiredDirList is full, send delete request for pending
deletion directories
- sendDeleteKeysRequestAndClearList(volume.getVolume(),
bucket.getBucketName(),
- expiredDirList, true);
+ if (!dirInfo.getName().equals(TRASH_PREFIX)) {
+ prefix += dirInfo.getObjectID();
+ if (dirInfo.getName().equals(rule.getEffectivePrefix()) &&
rule.match(dirInfo, dirInfo.getName())) {
+ if (expiredDirList.isFull()) {
+ // if expiredDirList is full, send delete/rename request for
expired directories
+ handleAndClearFullList(bucket, expiredDirList, true);
+ }
+ expiredDirList.add(dirInfo.getName(), 0, dirInfo.getUpdateID());
}
- expiredDirList.add(dirInfo.getName(), dirInfo.getUpdateID());
+ } else {
+ dirInfo = null;
+ LOG.info("Skip evaluate trash directory {}", TRASH_PREFIX);
}
}
LOG.info("Prefix {} for {}", prefix, bucketKey);
@@ -412,12 +435,10 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
if (rule.match(key)) {
// mark key as expired, check next key
if (expiredKeyList.isFull()) {
- // if expiredKeyList is full, send delete request for
pending deletion keys
- sendDeleteKeysRequestAndClearList(volume.getVolume(),
bucket.getBucketName(),
- expiredKeyList, false);
+ // if expiredKeyList is full, send delete/rename request for
expired keys
+ handleAndClearFullList(bucket, expiredKeyList, false);
}
- expiredKeyList.add(key.getKeyName(), key.getUpdateID());
- sizeKeyDeleted += key.getReplicatedSize();
+ expiredKeyList.add(key.getKeyName(), key.getReplicatedSize(),
key.getUpdateID());
break;
}
}
@@ -433,15 +454,18 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
OmDirectoryInfo dir = entry.getValue();
numDirIterated++;
+ // skip TRASH_PREFIX directory
+ if (dir.getName().equals(TRASH_PREFIX)) {
+ continue;
+ }
for (OmLCRule rule : noPrefixRuleList) {
if (rule.match(dir, dir.getPath())) {
// mark directory as expired, check next directory
if (expiredDirList.isFull()) {
- // if expiredDirList is full, send delete request for
pending deletion directories
- sendDeleteKeysRequestAndClearList(volume.getVolume(),
bucket.getBucketName(),
- expiredDirList, true);
+ // if expiredDirList is full, send delete/rename request for
expired directories
+ handleAndClearFullList(bucket, expiredDirList, true);
}
- expiredDirList.add(dir.getPath(), dir.getUpdateID());
+ expiredDirList.add(dir.getPath(), 0, dir.getUpdateID());
break;
}
}
@@ -467,11 +491,10 @@ private void evaluateKeyTable(Table<String, OmKeyInfo>
keyTable, String prefix,
if (rule.match(key, keyPath)) {
// mark key as expired, check next key
if (keyList.isFull()) {
- // if keyList is full, send delete request for pending deletion
keys
- sendDeleteKeysRequestAndClearList(volumeName, bucketName,
keyList, false);
+ // if keyList is full, send delete/rename request for expired
keys
+ handleAndClearFullList(bucket, keyList, false);
}
- keyList.add(keyPath, key.getUpdateID());
- sizeKeyDeleted += key.getReplicatedSize();
+ keyList.add(keyPath, key.getReplicatedSize(), key.getUpdateID());
}
}
} catch (IOException e) {
@@ -489,15 +512,19 @@ private void evaluateDirTable(Table<String,
OmDirectoryInfo> directoryInfoTable,
while (dirTblItr.hasNext()) {
Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
OmDirectoryInfo dir = entry.getValue();
- String dirPath = directoryPath + dir.getName();
numDirIterated++;
+ // skip TRASH_PREFIX directory
+ if (dir.getName().equals(TRASH_PREFIX)) {
+ continue;
+ }
+ String dirPath = directoryPath + dir.getName();
if (rule.match(dir, dirPath)) {
// mark dir as expired, check next key
if (dirList.isFull()) {
- // if dirList is full, send delete request for pending deletion
directories
- sendDeleteKeysRequestAndClearList(volumeName, bucketName,
dirList, true);
+ // if dirList is full, send delete/rename request for expired
directories
+ handleAndClearFullList(bucket, dirList, true);
}
- dirList.add(dirPath, dir.getUpdateID());
+ dirList.add(dirPath, 0, dir.getUpdateID());
}
}
} catch (IOException e) {
@@ -510,6 +537,16 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
Table<String, OmKeyInfo> keyTable, List<OmLCRule> ruleList,
LimitedExpiredObjectList expiredKeyList) {
String volumeName = bucketInfo.getVolumeName();
String bucketName = bucketInfo.getBucketName();
+
+ if (bucketInfo.getBucketLayout() == BucketLayout.LEGACY) {
+ List<OmLCRule> prefixStartsWithTrashRuleList =
+ ruleList.stream().filter(r -> r.isPrefixEnable() &&
r.getEffectivePrefix().startsWith(
+ TRASH_PREFIX +
OzoneConsts.OM_KEY_PREFIX)).collect(Collectors.toList());
+ ruleList.removeAll(prefixStartsWithTrashRuleList);
+ prefixStartsWithTrashRuleList.stream().forEach(
+ r -> LOG.info("Skip rule {} as its prefix starts with {}", r,
TRASH_PREFIX + OzoneConsts.OM_KEY_PREFIX));
+ }
+
// use bucket name as key iterator prefix
try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyTblItr =
keyTable.iterator(omMetadataManager.getBucketKey(volumeName,
bucketName))) {
@@ -517,15 +554,19 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
Table.KeyValue<String, OmKeyInfo> keyValue = keyTblItr.next();
OmKeyInfo key = keyValue.getValue();
numKeyIterated++;
+ if (bucketInfo.getBucketLayout() == BucketLayout.LEGACY &&
+ key.getKeyName().startsWith(TRASH_PREFIX +
OzoneConsts.OM_KEY_PREFIX)) {
+ LOG.info("Skip evaluate trash directory {} and all its child files
and sub directories", TRASH_PREFIX);
+ continue;
+ }
for (OmLCRule rule : ruleList) {
if (rule.match(key)) {
// mark key as expired, check next key
if (expiredKeyList.isFull()) {
- // if expiredKeyList is full, send delete request for pending
deletion keys
- sendDeleteKeysRequestAndClearList(volumeName, bucketName,
expiredKeyList, false);
+ // if expiredKeyList is full, send delete/rename request for
expired keys
+ handleAndClearFullList(bucketInfo, expiredKeyList, false);
}
- expiredKeyList.add(key.getKeyName(), key.getUpdateID());
- sizeKeyDeleted += key.getReplicatedSize();
+ expiredKeyList.add(key.getKeyName(), key.getReplicatedSize(),
key.getUpdateID());
break;
}
}
@@ -551,7 +592,7 @@ private OmDirectoryInfo getDirectory(OmVolumeArgs volume,
OmBucketInfo bucket, S
* If prefix is /dir1/dir2, but dir1 doesn't exist, then it will return
exception.
* If prefix is /dir1/dir2, but dir2 doesn't exist, then it will return a
list with dir1 only.
* If prefix is /dir1/dir2, although dir1 exists, but get(dir1) failed
with IOException,
- * then it will return exception too.
+ * then it will return exception too.
*/
private List<OmDirectoryInfo> getDirList(OmVolumeArgs volume, OmBucketInfo
bucket, String prefix, String bucketKey)
throws IOException {
@@ -588,6 +629,8 @@ private List<OmDirectoryInfo> getDirList(OmVolumeArgs
volume, OmBucketInfo bucke
private void onFailure(String bucketName) {
inFlight.remove(bucketName);
metrics.incrNumFailureTask();
+ metrics.incNumKeyIterated(numKeyIterated);
+ metrics.incNumDirIterated(numDirIterated);
}
private void onSuccess(String bucketName) {
@@ -597,9 +640,17 @@ private void onSuccess(String bucketName) {
metrics.incTaskLatencyMs(timeSpent);
metrics.incNumKeyIterated(numKeyIterated);
metrics.incNumDirIterated(numDirIterated);
- metrics.incrSizeKeyDeleted(sizeKeyDeleted);
- LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs,
deleted {} keys with {} bytes, and {} dirs",
- timeSpent, bucketName, numKeyIterated, numDirIterated,
numKeyDeleted, sizeKeyDeleted, numDirDeleted);
+ LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs,
deleted {} keys with {} bytes, " +
+ "and {} dirs, renamed {} keys with {} bytes to trash", timeSpent,
bucketName, numKeyIterated, numDirIterated,
+ numKeyDeleted, sizeKeyDeleted, numDirDeleted, numKeyRenamed,
sizeKeyRenamed);
+ }
+
+ private void handleAndClearFullList(OmBucketInfo bucket,
LimitedExpiredObjectList keysList, boolean dir) {
+ if (bucket.getBucketLayout() != OBJECT_STORE && ozoneTrash != null) {
+ moveKeysToTrash(bucket, keysList);
+ } else {
+ sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(), keysList, dir);
+ }
}
private void sendDeleteKeysRequestAndClearList(String volume, String
bucket,
@@ -641,46 +692,186 @@ private void sendDeleteKeysRequestAndClearList(String
volume, String bucket,
getOzoneManager(), omRequest, clientId,
callId.getAndIncrement());
long endTime = System.nanoTime();
LOG.debug("DeleteKeys request with {} keys cost {} ns", keyCount,
endTime - startTime);
- i += batchSize;
- startIndex += batchSize;
+ long deletedCount = keyCount;
+ long deletedSize = keysList.replicatedSizeSubList(startIndex,
endIndex)
+ .stream().mapToLong(Long::longValue).sum();
if (response != null) {
if (!response.getSuccess()) {
// log the failure and continue the iterating
- LOG.error("DeleteKeys request failed with " +
response.getMessage() +
- " for volume: {}, bucket: {}", volume, bucket);
- continue;
+ LOG.error("DeleteKeys request " + response.getStatus() + "
failed for volume: {}, bucket: {}",
+ volume, bucket);
+ if (response.getDeleteKeysResponse().hasUnDeletedKeys()) {
+ DeleteKeyArgs unDeletedKeys =
response.getDeleteKeysResponse().getUnDeletedKeys();
+ for (String key : unDeletedKeys.getKeysList()) {
+ Long size = keysList.getReplicatedSize(key);
+ if (size == null) {
+ LOG.error("Undeleted key {}/{}/{} doesn't in keyLists",
volume, bucket, key);
+ continue;
+ }
+ deletedCount -= 1;
+ deletedSize -= size;
+ }
+ }
+ for (DeleteKeyError e :
response.getDeleteKeysResponse().getErrorsList()) {
+ Long size = keysList.getReplicatedSize(e.getKey());
+ if (size == null) {
+ LOG.error("Deleted error key {}/{}/{} doesn't in
keyLists", volume, bucket, e.getKey());
+ continue;
+ }
+ deletedCount -= 1;
+ deletedSize -= size;
+ }
} else {
LOG.debug("DeleteKeys request of total {} keys, {} not
deleted", keyCount,
response.getDeleteKeysResponse().getErrorsCount());
}
}
if (dir) {
- numDirDeleted += keyCount;
- metrics.incrNumDirDeleted(keyCount);
+ numDirDeleted += deletedCount;
+ metrics.incrNumDirDeleted(deletedCount);
} else {
- numKeyDeleted += keyCount;
- metrics.incrNumKeyDeleted(keyCount);
+ numKeyDeleted += deletedCount;
+ sizeKeyDeleted += deletedSize;
+ metrics.incrNumKeyDeleted(deletedCount);
+ metrics.incrSizeKeyDeleted(deletedSize);
}
+ i += keyCount;
+ startIndex += keyCount;
} else {
batchSize /= 2;
}
}
- keysList.clear();
} catch (ServiceException e) {
LOG.error("Failed to send DeleteKeysRequest", e);
+ } finally {
+ keysList.clear();
}
}
- private void moveKeysToTrash(String volume, String bucket,
LimitedExpiredObjectList keysList) {
- for (int index = 0; index < keysList.size(); index++) {
+ private void moveKeysToTrash(OmBucketInfo bucket, LimitedExpiredObjectList
keysList) {
+ if (keysList.isEmpty()) {
+ return;
+ }
+ String volumeName = bucket.getVolumeName();
+ String bucketName = bucket.getBucketName();
+ String trashCurrent;
+ UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(bucket.getOwner());
+ try {
+ trashCurrent = checkAndCreateTrashDirectoryIfNeeded(bucket, ugi);
+ } catch (IOException e) {
+ keysList.clear();
+ return;
+ }
+
+ for (int i = 0; i < keysList.size(); i++) {
+ String keyName = keysList.getName(i);
+ String targetKeyName = trashCurrent + OM_KEY_PREFIX + keyName;
+ KeyArgs keyArgs = KeyArgs.newBuilder().setKeyName(keyName)
+ .setVolumeName(volumeName).setBucketName(bucketName).build();
+
+ /**
+ * Trash examples:
+ * /s3v/test/readme -> /s3v/test/.Trash/hadoop/Current/readme
+ * /s3v/test/dir1/readme -> /s3v/test/.Trash/hadoop/Current/dir1/readme
+ */
+ RenameKeyRequest renameKeyRequest = RenameKeyRequest.newBuilder()
+ .setKeyArgs(keyArgs)
+ .setToKeyName(targetKeyName)
+ .setUpdateID(keysList.getUpdateID(i))
+ .build();
+
+ // send request out
+ OMRequest omRequest = OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.RenameKey)
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .setClientId(clientId.toString())
+ .setRenameKeyRequest(renameKeyRequest)
+ .build();
try {
- ozoneTrash.moveToTrash(new Path(OM_KEY_PREFIX + volume +
OM_KEY_PREFIX + bucket + OM_KEY_PREFIX +
- keysList.getName(index)));
- } catch (IOException e) {
- // log failure and continue
- LOG.warn("Failed to move key {} to trash", keysList.getName(index),
e);
+ // perform preExecute as ratis submit do no perform preExecute
+ OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+ OzoneManagerProtocolProtos.OMResponse omResponse =
+ ugi.doAs(new
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
+ @Override
+ public OzoneManagerProtocolProtos.OMResponse run() throws
Exception {
+ OMRequest request = omClientRequest.preExecute(ozoneManager);
+ return
OzoneManagerRatisUtils.submitRequest(getOzoneManager(),
+ request, clientId, callId.getAndIncrement());
+ }
+ });
+ if (omResponse != null) {
+ if (!omResponse.getSuccess()) {
+ // log the failure and continue the iterating
+ LOG.error("RenameKey request failed with source key: {}, dest
key: {}", keyName, targetKeyName);
+ continue;
+ }
+ }
+ LOG.debug("RenameKey request succeed with source key: {}, dest key:
{}", keyName, targetKeyName);
+ numKeyRenamed += 1;
+ sizeKeyRenamed += keysList.getReplicatedSize(i);
+ metrics.incrNumKeyRenamed(1);
+ metrics.incrSizeKeyRenamed(keysList.getReplicatedSize(i));
+ } catch (InterruptedException | IOException e) {
+ LOG.error("Failed to send RenameKeysRequest", e);
}
}
+ keysList.clear();
+ }
+
+ private String checkAndCreateTrashDirectoryIfNeeded(OmBucketInfo bucket,
UserGroupInformation ugi)
+ throws IOException {
+ String userTrashRoot = TRASH_PREFIX + OM_KEY_PREFIX + bucket.getOwner();
+ String userTrashCurrent = userTrashRoot + OM_KEY_PREFIX + CURRENT;
+ try {
+ OmKeyArgs key = new
OmKeyArgs.Builder().setVolumeName(bucket.getVolumeName())
+ .setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
+ .setOwnerName(bucket.getOwner()).build();
+ ozoneManager.getFileStatus(key);
+ return userTrashCurrent;
+ } catch (IOException e) {
+ if (e instanceof OMException &&
+ (((OMException) e).getResult() ==
OMException.ResultCodes.FILE_NOT_FOUND ||
+ ((OMException) e).getResult() ==
OMException.ResultCodes.DIRECTORY_NOT_FOUND)) {
+ // create the trash/Current directory for user
+ KeyArgs keyArgs =
KeyArgs.newBuilder().setVolumeName(bucket.getVolumeName())
+
.setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
+ .setOwnerName(bucket.getOwner()).setRecursive(true).build();
+ OMRequest omRequest =
OMRequest.newBuilder().setCreateDirectoryRequest(
+ CreateDirectoryRequest.newBuilder().setKeyArgs(keyArgs))
+ .setCmdType(OzoneManagerProtocolProtos.Type.CreateDirectory)
+ .setVersion(ClientVersion.CURRENT_VERSION)
+ .setClientId(clientId.toString())
+ .build();
+ try {
+ // perform preExecute as ratis submit do no perform preExecute
+ final OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+ OzoneManagerProtocolProtos.OMResponse omResponse =
+ ugi.doAs(new
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
+ @Override
+ public OzoneManagerProtocolProtos.OMResponse run() throws
Exception {
+ OMRequest request =
omClientRequest.preExecute(ozoneManager);
+ return
OzoneManagerRatisUtils.submitRequest(getOzoneManager(),
+ request, clientId, callId.getAndIncrement());
+ }
+ });
+
+ if (omResponse != null) {
+ if (!omResponse.getSuccess()) {
+ LOG.error("CreateDirectory request failed with {}, path: {}",
+ omResponse.getMessage(), userTrashCurrent);
+ throw new IOException("Failed to create trash directory " +
userTrashCurrent);
+ }
+ LOG.debug("Created trash current directory: {}",
userTrashCurrent);
+ return userTrashCurrent;
+ }
+ } catch (InterruptedException | IOException e1) {
+ LOG.error("Failed to send CreateDirectoryRequest for {}",
userTrashCurrent, e1);
+ throw new IOException("Failed to send CreateDirectoryRequest
request for " + userTrashCurrent);
+ }
+ }
+ LOG.error("Failed to get trash current directory {} status",
userTrashCurrent, e);
+ throw e;
+ }
}
}
@@ -704,20 +895,28 @@ public void setListMaxSize(int size) {
this.listMaxSize = size;
}
+ @VisibleForTesting
+ public void setOzoneTrash(OzoneTrash ozoneTrash) {
+ this.ozoneTrash = ozoneTrash;
+ }
+
/**
* An in-memory list with limited size to hold expired object infos,
including object name and current update ID.
*/
public static class LimitedExpiredObjectList {
private final LimitedSizeList<String> objectNames;
+ private final List<Long> objectReplicatedSize;
private final List<Long> objectUpdateIDs;
public LimitedExpiredObjectList(int maxListSize) {
this.objectNames = new LimitedSizeList<>(maxListSize);
+ this.objectReplicatedSize = new ArrayList<>();
this.objectUpdateIDs = new ArrayList<>();
}
- public void add(String name, long updateID) {
+ public void add(String name, long size, long updateID) {
objectNames.add(name);
+ objectReplicatedSize.add(size);
objectUpdateIDs.add(updateID);
}
@@ -733,9 +932,23 @@ public List<Long> updateIDSubList(int fromIndex, int
toIndex) {
return objectUpdateIDs.subList(fromIndex, toIndex);
}
+ public List<Long> replicatedSizeSubList(int fromIndex, int toIndex) {
+ return objectReplicatedSize.subList(fromIndex, toIndex);
+ }
+
+ public Long getReplicatedSize(String keyName) {
+ for (int index = 0; index < objectNames.size(); index++) {
+ if (objectNames.get(index).equals(keyName)) {
+ return objectReplicatedSize.get(index);
+ }
+ }
+ return null;
+ }
+
public void clear() {
objectNames.clear();
objectUpdateIDs.clear();
+ objectReplicatedSize.clear();
}
public boolean isEmpty() {
@@ -753,6 +966,10 @@ public String getName(int index) {
public long getUpdateID(int index) {
return objectUpdateIDs.get(index);
}
+
+ public long getReplicatedSize(int index) {
+ return objectReplicatedSize.get(index);
+ }
}
/**
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
index 104be2995cc..77efe4f2287 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
@@ -70,16 +70,21 @@ public static void unregister() {
@Metric("Execution time of a success task for a bucket")
private MutableRate taskLatencyMs;
- @Metric("Number of key iterated of a success task for a bucket")
+ // following metrics are updated by both success and failure tasks
+ @Metric("Number of key iterated")
private MutableGaugeLong numKeyIterated;
- @Metric("Number of dir iterated of a success task for a bucket")
+ @Metric("Number of dir iterated")
private MutableGaugeLong numDirIterated;
@Metric("Total directories deleted")
private MutableGaugeLong numDirDeleted;
@Metric("Total keys deleted")
private MutableGaugeLong numKeyDeleted;
+ @Metric("Total keys renamed")
+ private MutableGaugeLong numKeyRenamed;
@Metric("Total size of keys deleted")
private MutableGaugeLong sizeKeyDeleted;
+ @Metric("Total size of keys renamed")
+ private MutableGaugeLong sizeKeyRenamed;
public void incrNumSkippedTask() {
numSkippedTask.incr();
@@ -101,10 +106,18 @@ public void incrNumKeyDeleted(long keyCount) {
numKeyDeleted.incr(keyCount);
}
+ public void incrNumKeyRenamed(long keyCount) {
+ numKeyRenamed.incr(keyCount);
+ }
+
public void incrSizeKeyDeleted(long size) {
sizeKeyDeleted.incr(size);
}
+ public void incrSizeKeyRenamed(long size) {
+ sizeKeyRenamed.incr(size);
+ }
+
public MutableGaugeLong getNumDirDeleted() {
return numDirDeleted;
}
@@ -113,10 +126,18 @@ public MutableGaugeLong getNumKeyDeleted() {
return numKeyDeleted;
}
+ public MutableGaugeLong getNumKeyRenamed() {
+ return numKeyRenamed;
+ }
+
public MutableGaugeLong getSizeKeyDeleted() {
return sizeKeyDeleted;
}
+ public MutableGaugeLong getSizeKeyRenamed() {
+ return sizeKeyRenamed;
+ }
+
public MutableGaugeLong getNumDirIterated() {
return numDirIterated;
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
index 7df7e244b76..4a665a28ad0 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
@@ -17,14 +17,20 @@
package org.apache.hadoop.ozone.om.service;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.FileSystem.TRASH_PREFIX;
+import static org.apache.hadoop.fs.ozone.OzoneTrashPolicy.CURRENT;
import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
import static
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
+import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
+import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -38,6 +44,7 @@
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@@ -53,6 +60,7 @@
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -67,7 +75,9 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmTestManagers;
import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.OzoneTrash;
import org.apache.hadoop.ozone.om.ScmBlockLocationTestingClient;
+import org.apache.hadoop.ozone.om.TrashOzoneFileSystem;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.KeyInfoWithVolumeContext;
@@ -92,6 +102,8 @@
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.OzoneTestBase;
import org.apache.ratis.util.ExitUtils;
@@ -128,7 +140,7 @@ class TestKeyLifecycleService extends OzoneTestBase {
LoggerFactory.getLogger(TestKeyLifecycleService.class);
private static final AtomicInteger OBJECT_COUNTER = new AtomicInteger();
private static final AtomicInteger OBJECT_ID_COUNTER = new AtomicInteger();
- private static final int KEY_COUNT = 10;
+ private static final int KEY_COUNT = 2;
private static final int EXPIRE_SECONDS = 2;
private static final int SERVICE_INTERVAL = 300;
private static final int WAIT_CHECK_INTERVAL = 50;
@@ -194,6 +206,7 @@ void setup(@TempDir File testDir) throws Exception {
@AfterEach
void resume() {
+ keyLifecycleService.setOzoneTrash(null);
}
@AfterAll
@@ -294,7 +307,8 @@ void testOnlyKeyExpired(BucketLayout bucketLayout, boolean
createPrefix) throws
long initialKeyCount = getKeyCount(bucketLayout);
// Create the key
- createVolumeAndBucket(volumeName, bucketName, bucketLayout, false);
+ createVolumeAndBucket(volumeName, bucketName, bucketLayout,
+ UserGroupInformation.getCurrentUser().getShortUserName());
OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName,
uniqueObjectName(prefix), 1, null);
// create Lifecycle configuration
@@ -879,7 +893,8 @@ void testExpireOnlyDirectory(String dirName, String prefix,
int dirDepth, int de
long initialNumDeletedDir = metrics.getNumDirDeleted().value();
// Create the directory
- createVolumeAndBucket(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED,
false);
+ createVolumeAndBucket(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED,
+ UserGroupInformation.getCurrentUser().getShortUserName());
createDirectory(volumeName, bucketName, dirName);
KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, bucketName,
dirName);
assertFalse(keyInfo.getKeyInfo().isFile());
@@ -1038,7 +1053,7 @@ void testKeyUpdatedShouldNotGetDeleted(BucketLayout
bucketLayout)
KeyLifecycleService.getInjector(0).resume();
GenericTestUtils.waitFor(
- () -> log.getOutput().contains(KEY_COUNT + " expired keys and 0
expired dirs found"),
+ () -> log.getOutput().contains(KEY_COUNT + " expired keys and 0
expired dirs found and remained"),
WAIT_CHECK_INTERVAL, 10000);
OmKeyArgs key =
keyList.get(ThreadLocalRandom.current().nextInt(keyList.size()));
@@ -1227,17 +1242,29 @@ void testPerformanceWithNestedDir(BucketLayout
bucketLayout, String prefix)
deleteLifecyclePolicy(volumeName, bucketName);
}
+ public Stream<Arguments> parameters6() {
+ return Stream.of(
+ arguments("FILE_SYSTEM_OPTIMIZED", true),
+ arguments("FILE_SYSTEM_OPTIMIZED", false),
+ arguments("LEGACY", true),
+ arguments("LEGACY", false),
+ arguments("OBJECT_STORE", true),
+ arguments("OBJECT_STORE", false));
+ }
+
@ParameterizedTest
- @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "OBJECT_STORE"})
- void testListMaxSize(BucketLayout bucketLayout) throws IOException,
+ @MethodSource("parameters6")
+ void testListMaxSize(BucketLayout bucketLayout, boolean enableTrash)
throws IOException,
TimeoutException, InterruptedException {
final String volumeName = getTestName();
final String bucketName = uniqueObjectName("bucket");
String prefix = "key";
long initialDeletedKeyCount = getDeletedKeyCount();
long initialKeyCount = getKeyCount(bucketLayout);
- final int keyCount = 500;
- keyLifecycleService.setListMaxSize(100);
+ long initialRenamedKeyCount = metrics.getNumKeyRenamed().value();
+ final int keyCount = 100;
+ final int maxListSize = 20;
+ keyLifecycleService.setListMaxSize(maxListSize);
// create keys
List<OmKeyArgs> keyList =
createKeys(volumeName, bucketName, bucketLayout, keyCount, 1,
prefix, null);
@@ -1247,6 +1274,15 @@ void testListMaxSize(BucketLayout bucketLayout) throws
IOException,
GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) -
initialKeyCount == keyCount,
WAIT_CHECK_INTERVAL, 1000);
+ if (enableTrash) {
+ final float trashInterval = 0.5f; // 30 seconds, 0.5 * (60 * 1000) ms
+ conf.setFloat(FS_TRASH_INTERVAL_KEY, trashInterval);
+ FileSystem fs = SecurityUtil.doAsLoginUser(
+ (PrivilegedExceptionAction<FileSystem>)
+ () -> new TrashOzoneFileSystem(om));
+ keyLifecycleService.setOzoneTrash(new OzoneTrash(fs, conf, om));
+ }
+
GenericTestUtils.setLogLevel(KeyLifecycleService.getLog(), Level.DEBUG);
GenericTestUtils.LogCapturer log =
GenericTestUtils.LogCapturer.captureLogs(
@@ -1256,14 +1292,124 @@ void testListMaxSize(BucketLayout bucketLayout) throws
IOException,
ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
createLifecyclePolicy(volumeName, bucketName, bucketLayout, prefix,
null, date.toString(), true);
+ if (enableTrash && bucketLayout != OBJECT_STORE) {
+ GenericTestUtils.waitFor(() ->
+ (metrics.getNumKeyRenamed().value() - initialRenamedKeyCount) ==
keyCount, WAIT_CHECK_INTERVAL, 5000);
+ assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
+ } else {
+ GenericTestUtils.waitFor(() ->
+ (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount,
WAIT_CHECK_INTERVAL, 5000);
+ assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
+ }
GenericTestUtils.waitFor(() ->
- (getDeletedKeyCount() - initialDeletedKeyCount) == keyCount,
WAIT_CHECK_INTERVAL, 5000);
- assertEquals(0, getKeyCount(bucketLayout) - initialKeyCount);
- GenericTestUtils.waitFor(() ->
- log.getOutput().contains("LimitedSizeList has reached maximum size "
+ 100), WAIT_CHECK_INTERVAL, 5000);
+ log.getOutput().contains("LimitedSizeList has reached maximum size "
+ maxListSize),
+ WAIT_CHECK_INTERVAL, 5000);
GenericTestUtils.setLogLevel(KeyLifecycleService.getLog(), Level.INFO);
deleteLifecyclePolicy(volumeName, bucketName);
}
+
+ @ParameterizedTest
+ @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "LEGACY"})
+ void testMoveToTrash(BucketLayout bucketLayout) throws IOException,
+ TimeoutException, InterruptedException {
+ final String volumeName = getTestName();
+ final String bucketName = uniqueObjectName("bucket");
+ String prefix = "key";
+ long initialDeletedKeyCount = getDeletedKeyCount();
+ long initialKeyCount = getKeyCount(bucketLayout);
+ long initialRenamedKeyCount = metrics.getNumKeyRenamed().value();
+ // create keys
+ String bucketOwner =
UserGroupInformation.getCurrentUser().getShortUserName() + "-test";
+ List<OmKeyArgs> keyList =
+ createKeys(volumeName, bucketName, bucketLayout, bucketOwner,
KEY_COUNT, 1, prefix, null);
+ // check there are keys in keyTable
+ Thread.sleep(SERVICE_INTERVAL);
+ assertEquals(KEY_COUNT, keyList.size());
+ GenericTestUtils.waitFor(() -> getKeyCount(bucketLayout) -
initialKeyCount == KEY_COUNT,
+ WAIT_CHECK_INTERVAL, 1000);
+
+ // enabled trash
+ final float trashInterval = 0.5f; // 30 seconds, 0.5 * (60 * 1000) ms
+ conf.setFloat(FS_TRASH_INTERVAL_KEY, trashInterval);
+ FileSystem fs = SecurityUtil.doAsLoginUser(
+ (PrivilegedExceptionAction<FileSystem>)
+ () -> new TrashOzoneFileSystem(om));
+ keyLifecycleService.setOzoneTrash(new OzoneTrash(fs, conf, om));
+
+ // create Lifecycle configuration
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, "", null,
date.toString(), true);
+
+ GenericTestUtils.waitFor(() ->
+ (metrics.getNumKeyRenamed().value() - initialRenamedKeyCount) ==
KEY_COUNT, WAIT_CHECK_INTERVAL, 50000);
+ assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
+ deleteLifecyclePolicy(volumeName, bucketName);
+ // verify trash directory has the right native ACLs
+ List<KeyInfoWithVolumeContext> dirList = new ArrayList<>();
+ if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
+ dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX));
+ dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX +
OM_KEY_PREFIX + bucketOwner));
+ dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX +
OM_KEY_PREFIX + bucketOwner +
+ OM_KEY_PREFIX + CURRENT));
+ } else {
+ dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX +
OM_KEY_PREFIX));
+ dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX +
OM_KEY_PREFIX + bucketOwner + OM_KEY_PREFIX));
+ dirList.add(getDirectory(volumeName, bucketName, TRASH_PREFIX +
OM_KEY_PREFIX + bucketOwner +
+ OM_KEY_PREFIX + CURRENT + OM_KEY_PREFIX));
+ }
+ for (KeyInfoWithVolumeContext dir : dirList) {
+ List<OzoneAcl> aclList = dir.getKeyInfo().getAcls();
+ for (OzoneAcl acl : aclList) {
+ if (acl.getType() == IAccessAuthorizer.ACLIdentityType.USER ||
+ acl.getType() == IAccessAuthorizer.ACLIdentityType.GROUP) {
+ assertEquals(bucketOwner, acl.getName());
+ assertTrue(acl.getAclList().contains(ALL));
+ }
+ }
+ }
+
+ GenericTestUtils.LogCapturer log =
+ GenericTestUtils.LogCapturer.captureLogs(
+ LoggerFactory.getLogger(KeyLifecycleService.class));
+
+ // keys under trash directory is counted in getKeyCount()
+ if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
+ assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
+ } else {
+ // For legacy bucket, trash directories along .Trash/user-test/Current
are in key table too.
+ assertEquals(KEY_COUNT + 3, getKeyCount(bucketLayout) -
initialKeyCount);
+ }
+ // create new policy to test rule with prefix ".Trash/" is ignored
during lifecycle evaluation
+ now = ZonedDateTime.now(ZoneOffset.UTC);
+ date = now.plusSeconds(EXPIRE_SECONDS);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, TRASH_PREFIX
+ OM_KEY_PREFIX,
+ null, date.toString(), true);
+
+ GenericTestUtils.waitFor(
+ () -> log.getOutput().contains("Skip rule") &&
+ log.getOutput().contains("as its prefix starts with " +
TRASH_PREFIX + OM_KEY_PREFIX),
+ WAIT_CHECK_INTERVAL, 5000);
+ deleteLifecyclePolicy(volumeName, bucketName);
+
+ // create new policy to test rule with prefix ".Trash" is ignored during
lifecycle evaluation
+ now = ZonedDateTime.now(ZoneOffset.UTC);
+ date = now.plusSeconds(EXPIRE_SECONDS);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout,
TRASH_PREFIX, null, date.toString(), true);
+
+ GenericTestUtils.waitFor(
+ () -> log.getOutput().contains("Skip evaluate trash directory " +
TRASH_PREFIX), WAIT_CHECK_INTERVAL, 5000);
+ deleteLifecyclePolicy(volumeName, bucketName);
+
+ // create new policy to test trash directory is skipped during lifecycle
evaluation
+ now = ZonedDateTime.now(ZoneOffset.UTC);
+ date = now.plusSeconds(EXPIRE_SECONDS);
+ createLifecyclePolicy(volumeName, bucketName, bucketLayout, "", null,
date.toString(), true);
+
+ GenericTestUtils.waitFor(
+ () -> log.getOutput().contains("No expired keys/dirs found/remained
for bucket"), WAIT_CHECK_INTERVAL, 5000);
+ deleteLifecyclePolicy(volumeName, bucketName);
+ }
}
/**
@@ -1319,7 +1465,8 @@ void testBucketDeleted(BucketLayout bucketLayout) throws
IOException, Interrupte
GenericTestUtils.LogCapturer log =
GenericTestUtils.LogCapturer.captureLogs(
LoggerFactory.getLogger(KeyLifecycleService.class));
- createVolumeAndBucket(volumeName, bucketName, bucketLayout, false);
+ createVolumeAndBucket(volumeName, bucketName, bucketLayout,
+ UserGroupInformation.getCurrentUser().getShortUserName());
assertNotNull(writeClient.getBucketInfo(volumeName, bucketName));
// create Lifecycle configuration
@@ -1462,8 +1609,15 @@ void testUnsupportedPrefixForFSO(String prefix, boolean
createPrefix) {
private List<OmKeyArgs> createKeys(String volume, String bucket,
BucketLayout bucketLayout,
int keyCount, int numBlocks, String keyPrefix, Map<String, String> tags)
throws IOException {
+ return createKeys(volume, bucket, bucketLayout,
UserGroupInformation.getCurrentUser().getShortUserName(),
+ keyCount, numBlocks, keyPrefix, tags);
+ }
+
+ @SuppressWarnings("parameternumber")
+ private List<OmKeyArgs> createKeys(String volume, String bucket,
BucketLayout bucketLayout, String owner,
+ int keyCount, int numBlocks, String keyPrefix, Map<String, String> tags)
throws IOException {
// Create Volume and Bucket
- createVolumeAndBucket(volume, bucket, bucketLayout, false);
+ createVolumeAndBucket(volume, bucket, bucketLayout, owner);
List<OmKeyArgs> keyList = new ArrayList<>();
for (int x = 0; x < keyCount; x++) {
final String keyName = uniqueObjectName(keyPrefix);
@@ -1543,7 +1697,7 @@ private void deleteLifecyclePolicy(String volume, String
bucket)
}
private void createVolumeAndBucket(String volumeName,
- String bucketName, BucketLayout bucketLayout, boolean
isVersioningEnabled) throws IOException {
+ String bucketName, BucketLayout bucketLayout, String owner) throws
IOException {
// cheat here, just create a volume and bucket entry so that we can
// create the keys, we put the same data for key and value since the
// system does not decode the object
@@ -1558,8 +1712,8 @@ private void createVolumeAndBucket(String volumeName,
OMRequestTestUtils.addBucketToOM(keyManager.getMetadataManager(),
OmBucketInfo.newBuilder().setVolumeName(volumeName)
.setBucketName(bucketName)
- .setIsVersionEnabled(isVersioningEnabled)
.setBucketLayout(bucketLayout)
+ .setOwner(owner)
.setObjectID(OBJECT_ID_COUNTER.incrementAndGet())
.build());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]