This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-8342
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-8342 by this push:
new 6542b31ff8a HDDS-12793. Recursive iterate FSO directory. (#9042)
6542b31ff8a is described below
commit 6542b31ff8a4b21f8faf34614f8c53463d828bc0
Author: Sammi Chen <[email protected]>
AuthorDate: Tue Nov 4 12:16:54 2025 +0800
HDDS-12793. Recursive iterate FSO directory. (#9042)
---
.../common/src/main/resources/ozone-default.xml | 10 +-
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 3 +
.../ozone/om/service/KeyLifecycleService.java | 599 ++++++++++++++-------
.../om/service/KeyLifecycleServiceMetrics.java | 10 +
.../ozone/om/service/TestKeyLifecycleService.java | 140 ++++-
5 files changed, 573 insertions(+), 189 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 3990f3b4898..7680fd62f12 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -4716,5 +4716,13 @@
<description>Number of workers executed of key lifecycle management
service. This
configuration should be set to greater than 0.</description>
</property>
-
+ <property>
+ <name>ozone.lifecycle.service.delete.cached.directory.max-count</name>
+ <value>1000000</value>
+ <tag>OZONE</tag>
+ <description>Max numbers of directory objects held in memory stack for
recursive FSO bucket evaluating for
+ a lifecycle evaluating task. Once the cached directory objects exceeds
this limit, the evaluation of the involved
+ directories will abort.
+ </description>
+ </property>
</configuration>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index c44a0cdf380..94e366beaed 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -195,6 +195,9 @@ private OMConfigKeys() {
public static final String OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE =
"ozone.lifecycle.service.delete.batch-size";
public static final int
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT = 1000;
+ public static final String
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT =
+ "ozone.lifecycle.service.delete.cached.directory.max-count";
+ public static final long
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT_DEFAULT = 1000000;
/**
* OM Ratis related configurations.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
index 09b0c55efee..9e7112f1b31 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleService.java
@@ -22,6 +22,8 @@
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED_DEFAULT;
import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
@@ -29,17 +31,24 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
+import jakarta.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Paths;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Deque;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.utils.BackgroundService;
@@ -50,6 +59,8 @@
import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.KeyManager;
@@ -92,6 +103,7 @@ public class KeyLifecycleService extends BackgroundService {
private final OzoneManager ozoneManager;
private int keyDeleteBatchSize;
private int listMaxSize;
+ private long cachedDirMaxCount;
private final AtomicBoolean suspended;
private KeyLifecycleServiceMetrics metrics;
private boolean isServiceEnabled;
@@ -116,6 +128,8 @@ public KeyLifecycleService(OzoneManager ozoneManager,
Preconditions.checkArgument(keyDeleteBatchSize > 0,
OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE + " should be a positive
value.");
this.listMaxSize = keyDeleteBatchSize >= 10000 ? keyDeleteBatchSize :
10000;
+ this.cachedDirMaxCount =
conf.getLong(OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT,
+ OZONE_KEY_LIFECYCLE_SERVICE_DELETE_CACHED_DIRECTORY_MAX_COUNT_DEFAULT);
this.suspended = new AtomicBoolean(false);
this.metrics = KeyLifecycleServiceMetrics.create();
this.isServiceEnabled =
conf.getBoolean(OZONE_KEY_LIFECYCLE_SERVICE_ENABLED,
@@ -226,6 +240,7 @@ public final class LifecycleActionTask implements
BackgroundTask {
private long sizeKeyDeleted = 0;
private long numKeyRenamed = 0;
private long sizeKeyRenamed = 0;
+ private long numDirRenamed = 0;
public LifecycleActionTask(OmLifecycleConfiguration lcConfig) {
this.policy = lcConfig;
@@ -316,17 +331,11 @@ public BackgroundTaskResult call() {
// If trash is enabled, move files to trash, instead of send delete
requests.
// OBS bucket doesn't support trash.
if (bucket.getBucketLayout() == OBJECT_STORE) {
- sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(),
- expiredKeyList, false);
- } else if (ozoneTrash != null) {
- // move keys to trash
- // TODO: move directory to trash in next patch
- moveKeysToTrash(bucket, expiredKeyList);
- } else {
sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(), expiredKeyList, false);
- if (!expiredDirList.isEmpty()) {
- sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(), expiredDirList, true);
- }
+ } else {
+ // handle keys first, then directories
+ handleAndClearFullList(bucket, expiredKeyList, false);
+ handleAndClearFullList(bucket, expiredDirList, true);
}
onSuccess(bucketKey);
}
@@ -342,21 +351,17 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
List<OmLCRule> prefixStartsWithTrashRuleList =
ruleList.stream().filter(r -> r.isPrefixEnable() &&
r.getEffectivePrefix().startsWith(
TRASH_PREFIX +
OzoneConsts.OM_KEY_PREFIX)).collect(Collectors.toList());
- List<OmLCRule> directoryStylePrefixRuleList =
- ruleList.stream().filter(r ->
r.isDirectoryStylePrefix()).collect(Collectors.toList());
- List<OmLCRule> nonDirectoryStylePrefixRuleList =
- ruleList.stream().filter(r -> r.isPrefixEnable() &&
!r.isDirectoryStylePrefix()).collect(Collectors.toList());
+ List<OmLCRule> prefixRuleList =
+ ruleList.stream().filter(r ->
r.isPrefixEnable()).collect(Collectors.toList());
// r.isPrefixEnable() == false means empty filter
List<OmLCRule> noPrefixRuleList =
ruleList.stream().filter(r ->
!r.isPrefixEnable()).collect(Collectors.toList());
- directoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
- nonDirectoryStylePrefixRuleList.removeAll(prefixStartsWithTrashRuleList);
+ prefixRuleList.removeAll(prefixStartsWithTrashRuleList);
prefixStartsWithTrashRuleList.stream().forEach(
r -> LOG.info("Skip rule {} as its prefix starts with {}", r,
TRASH_PREFIX + OzoneConsts.OM_KEY_PREFIX));
- Table<String, OmDirectoryInfo> directoryInfoTable =
omMetadataManager.getDirectoryTable();
- for (OmLCRule rule : directoryStylePrefixRuleList) {
+ for (OmLCRule rule : prefixRuleList) {
// find KeyInfo of each directory for prefix
List<OmDirectoryInfo> dirList;
try {
@@ -366,171 +371,307 @@ private void evaluateFSOBucket(OmVolumeArgs volume,
OmBucketInfo bucket, String
// skip this rule if some directory doesn't exist for this rule's
prefix
continue;
}
- // use last directory's object ID to iterate the keys
- String prefix = OM_KEY_PREFIX + volume.getObjectID() +
- OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
- StringBuffer directoryPath = new StringBuffer();
+ StringBuffer lastDirPath = new StringBuffer();
+ OmDirectoryInfo lastDir = null;
if (!dirList.isEmpty()) {
- OmDirectoryInfo lastDir = dirList.get(dirList.size() - 1);
- prefix += lastDir.getObjectID();
- for (OmDirectoryInfo dir : dirList) {
- directoryPath.append(dir.getName()).append(OM_KEY_PREFIX);
+ lastDir = dirList.get(dirList.size() - 1);
+ for (int i = 0; i < dirList.size(); i++) {
+ lastDirPath.append(dirList.get(i).getName());
+ if (i != dirList.size() - 1) {
+ lastDirPath.append(OM_KEY_PREFIX);
+ }
+ }
+ if (lastDirPath.toString().startsWith(TRASH_PREFIX)) {
+ LOG.info("Skip evaluate trash directory {}", lastDirPath);
+ } else {
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable,
lastDirPath.toString(), lastDir,
+ Arrays.asList(rule), expiredKeyList, expiredDirList);
}
- if ((directoryPath.toString().equals(rule.getEffectivePrefix()) ||
- directoryPath.toString().equals(rule.getEffectivePrefix() +
OM_KEY_PREFIX))
- && rule.match(lastDir, directoryPath.toString())) {
- if (expiredDirList.isFull()) {
- // if expiredDirList is full, send delete/rename request for
expired directories
- handleAndClearFullList(bucket, expiredDirList, true);
+
+ if (!rule.getEffectivePrefix().endsWith(OM_KEY_PREFIX)) {
+ // if the prefix doesn't end with "/", then also search and
evaluate the directory itself
+ // for example, "dir1/dir2" matches both directory "dir1/dir2" and
"dir1/dir22"
+ // or "dir1" matches both directory "dir1" and "dir11"
+ long objID;
+ String objPrefix;
+ String objPath;
+ if (dirList.size() > 1) {
+ OmDirectoryInfo secondLastDir = dirList.get(dirList.size() - 2);
+ objID = secondLastDir.getObjectID();
+ objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX
+ bucket.getObjectID() +
+ OM_KEY_PREFIX + secondLastDir.getObjectID();
+ StringBuffer secondLastDirPath = new StringBuffer();
+ for (int i = 0; i < dirList.size() - 1; i++) {
+ secondLastDirPath.append(dirList.get(i).getName());
+ if (i != dirList.size() - 2) {
+ secondLastDirPath.append(OM_KEY_PREFIX);
+ }
+ }
+ objPath = secondLastDirPath.toString();
+ } else {
+ objID = bucket.getObjectID();
+ objPrefix = OM_KEY_PREFIX + volume.getObjectID() + OM_KEY_PREFIX
+ bucket.getObjectID() +
+ OM_KEY_PREFIX + bucket.getObjectID();
+ objPath = "";
+ }
+ try {
+ SubDirectorySummary subDirSummary = getSubDirectory(objID,
objPrefix, omMetadataManager);
+ for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
+ String subDirPath = objPath.isEmpty() ? subDir.getName() :
objPath + OM_KEY_PREFIX + subDir.getName();
+ if (!subDir.getName().equals(TRASH_PREFIX) &&
subDirPath.startsWith(rule.getEffectivePrefix()) &&
+ (lastDir == null || subDir.getObjectID() !=
lastDir.getObjectID())) {
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(),
keyTable, subDirPath, subDir,
+ Arrays.asList(rule), expiredKeyList, expiredDirList);
+ }
+ }
+ } catch (IOException e) {
+ // log failure and continue the process
+ LOG.warn("Failed to get sub directories of {} under {}/{}",
objPrefix,
+ bucket.getVolumeName(), bucket.getBucketName(), e);
+ return;
}
- expiredDirList.add(directoryPath.toString(), 0,
lastDir.getUpdateID());
}
+ } else {
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
+ Arrays.asList(rule), expiredKeyList, expiredDirList);
}
+ }
+
+ if (!noPrefixRuleList.isEmpty()) {
+ evaluateKeyAndDirTable(bucket, volume.getObjectID(), keyTable, "",
null,
+ noPrefixRuleList, expiredKeyList, expiredDirList);
+ }
+ }
- LOG.info("Prefix {} for {}", prefix, bucketKey);
- evaluateKeyTable(keyTable, prefix, directoryPath.toString(), rule,
expiredKeyList, bucket);
- evaluateDirTable(directoryInfoTable, prefix, directoryPath.toString(),
rule,
- expiredDirList, bucket);
+ @SuppressWarnings({"checkstyle:parameternumber",
"checkstyle:MethodLength"})
+ private void evaluateKeyAndDirTable(OmBucketInfo bucket, long volumeObjId,
Table<String, OmKeyInfo> keyTable,
+ String directoryPath, @Nullable OmDirectoryInfo dir, List<OmLCRule>
ruleList, LimitedExpiredObjectList keyList,
+ LimitedExpiredObjectList dirList) {
+ String volumeName = bucket.getVolumeName();
+ String bucketName = bucket.getBucketName();
+ LimitedSizeStack stack = new LimitedSizeStack(cachedDirMaxCount);
+ try {
+ if (dir != null) {
+ stack.push(new PendingEvaluateDirectory(dir, directoryPath, null));
+ } else {
+ // put a placeholder PendingEvaluateDirectory to stack for bucket
+ stack.push(new PendingEvaluateDirectory(null, "", null));
+ }
+ } catch (CapacityFullException e) {
+ LOG.warn("Abort evaluate {}/{} at {}", volumeName, bucketName,
directoryPath != null ? directoryPath : "", e);
+ return;
}
- for (OmLCRule rule : nonDirectoryStylePrefixRuleList) {
- // find the directory for the prefix, it may not exist
- OmDirectoryInfo dirInfo = getDirectory(volume, bucket,
rule.getEffectivePrefix(), bucketKey);
- String prefix = OM_KEY_PREFIX + volume.getObjectID() +
- OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
- if (dirInfo != null) {
- if (!dirInfo.getName().equals(TRASH_PREFIX)) {
- prefix += dirInfo.getObjectID();
- if (dirInfo.getName().equals(rule.getEffectivePrefix()) &&
rule.match(dirInfo, dirInfo.getName())) {
- if (expiredDirList.isFull()) {
- // if expiredDirList is full, send delete/rename request for
expired directories
- handleAndClearFullList(bucket, expiredDirList, true);
+ HashSet<Long> deletedDirSet = new HashSet<>();
+ while (!stack.isEmpty()) {
+ PendingEvaluateDirectory item = stack.pop();
+ OmDirectoryInfo currentDir = item.getDirectoryInfo();
+ String currentDirPath = item.getDirPath();
+ long currentDirObjID = currentDir == null ? bucket.getObjectID() :
currentDir.getObjectID();
+
+ // use current directory's object ID to iterate the keys and
directories under it
+ String prefix =
+ OM_KEY_PREFIX + volumeObjId + OM_KEY_PREFIX + bucket.getObjectID()
+ OM_KEY_PREFIX + currentDirObjID;
+ LOG.debug("Prefix {} for {}/{}", prefix, bucket.getVolumeName(),
bucket.getBucketName());
+
+ // get direct sub directories
+ SubDirectorySummary subDirSummary = item.getSubDirSummary();
+ boolean newSubDirPushed = false;
+ long deletedDirCount = 0;
+ if (subDirSummary == null) {
+ try {
+ subDirSummary = getSubDirectory(currentDirObjID, prefix,
omMetadataManager);
+ } catch (IOException e) {
+ // log failure, continue to process other directories in stack
+ LOG.warn("Failed to get sub directories of {} under {}/{}",
currentDirPath, volumeName, bucketName, e);
+ continue;
+ }
+
+ // filter sub directory list
+ if (!subDirSummary.getSubDirList().isEmpty()) {
+ Iterator<OmDirectoryInfo> iterator =
subDirSummary.getSubDirList().iterator();
+ while (iterator.hasNext()) {
+ OmDirectoryInfo subDir = iterator.next();
+ String subDirPath = currentDirPath.isEmpty() ? subDir.getName() :
+ currentDirPath + OM_KEY_PREFIX + subDir.getName();
+ if (subDirPath.startsWith(TRASH_PREFIX)) {
+ iterator.remove();
+ }
+ boolean matched = false;
+ for (OmLCRule rule : ruleList) {
+ if (rule.getEffectivePrefix() != null &&
subDirPath.startsWith(rule.getEffectivePrefix())) {
+ matched = true;
+ break;
+ }
+ }
+ if (!matched) {
+ iterator.remove();
+ }
+ }
+ }
+
+ if (!subDirSummary.getSubDirList().isEmpty()) {
+ item.setSubDirSummary(subDirSummary);
+ try {
+ stack.push(item);
+ } catch (CapacityFullException e) {
+ LOG.warn("Abort evaluate {}/{} at {}", volumeName, bucketName,
currentDirPath, e);
+ return;
+ }
+
+ // depth first evaluation, push subDirs into stack
+ for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
+ String subDirPath = currentDirPath.isEmpty() ? subDir.getName() :
+ currentDirPath + OM_KEY_PREFIX + subDir.getName();
+ try {
+ stack.push(new PendingEvaluateDirectory(subDir, subDirPath,
null));
+ } catch (CapacityFullException e) {
+ LOG.warn("Abort evaluate {}/{} at {}", volumeName, bucketName,
subDirPath, e);
+ return;
+ }
+ }
+ newSubDirPushed = true;
+ }
+ } else {
+ // this item is a parent directory, check how many sub directories
are deleted.
+ for (OmDirectoryInfo subDir : subDirSummary.getSubDirList()) {
+ if (deletedDirSet.remove(subDir.getObjectID())) {
+ deletedDirCount++;
+ }
+ }
+ }
+
+ if (newSubDirPushed) {
+ continue;
+ }
+
+ // evaluate direct files, first check cache, then check table
+ // there are three cases:
+ // a. key is deleted in cache, while it's not deleted in table yet
+ // b. key is new added in cache, not in table yet
+ // c. key is updated in cache(rename), but not updated in table yet
+ // in this case, the fromKey is a deleted key in cache, and the
toKey is a newly added key in cache,
+ // and fromKey is also in table
+ long numKeysUnderDir = 0;
+ long numKeysExpired = 0;
+ HashSet<String> deletedKeySetInCache = new HashSet();
+ HashSet<String> keySetInCache = new HashSet();
+ Iterator<Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>>> cacheIter
= keyTable.cacheIterator();
+ while (cacheIter.hasNext()) {
+ Map.Entry<CacheKey<String>, CacheValue<OmKeyInfo>> entry =
cacheIter.next();
+ OmKeyInfo key = entry.getValue().getCacheValue();
+ if (key == null) {
+ deletedKeySetInCache.add(entry.getKey().getCacheKey());
+ continue;
+ }
+ if (key.getParentObjectID() == currentDirObjID) {
+ numKeysUnderDir++;
+ keySetInCache.add(entry.getKey().getCacheKey());
+ String keyPath = currentDirPath.isEmpty() ? key.getKeyName() :
+ currentDirPath + OM_KEY_PREFIX + key.getKeyName();
+ for (OmLCRule rule : ruleList) {
+ if (rule.match(key, keyPath)) {
+ // mark key as expired, check next key
+ if (keyList.isFull()) {
+ // if keyList is full, send delete/rename request for
expired keys
+ handleAndClearFullList(bucket, keyList, false);
+ }
+ keyList.add(keyPath, key.getReplicatedSize(),
key.getUpdateID());
+ numKeysExpired++;
+ break;
}
- expiredDirList.add(dirInfo.getName(), 0, dirInfo.getUpdateID());
}
- } else {
- dirInfo = null;
- LOG.info("Skip evaluate trash directory {}", TRASH_PREFIX);
}
}
- LOG.info("Prefix {} for {}", prefix, bucketKey);
- evaluateKeyTable(keyTable, prefix, dirInfo == null ? "" :
dirInfo.getName() + OzoneConsts.OM_KEY_PREFIX,
- rule, expiredKeyList, bucket);
- evaluateDirTable(directoryInfoTable, prefix,
- dirInfo == null ? "" : dirInfo.getName() +
OzoneConsts.OM_KEY_PREFIX, rule, expiredDirList, bucket);
- }
- if (!noPrefixRuleList.isEmpty()) {
- String prefix = OM_KEY_PREFIX + volume.getObjectID() +
- OM_KEY_PREFIX + bucket.getObjectID() + OM_KEY_PREFIX;
- LOG.info("prefix {} for {}", prefix, bucketKey);
- // use bucket name as key iterator prefix
try (TableIterator<String, ? extends Table.KeyValue<String,
OmKeyInfo>> keyTblItr =
keyTable.iterator(prefix)) {
while (keyTblItr.hasNext()) {
Table.KeyValue<String, OmKeyInfo> keyValue = keyTblItr.next();
OmKeyInfo key = keyValue.getValue();
+ String keyPath = currentDirPath.isEmpty() ? key.getKeyName() :
+ currentDirPath + OM_KEY_PREFIX + key.getKeyName();
numKeyIterated++;
- for (OmLCRule rule : noPrefixRuleList) {
- if (rule.match(key)) {
+ if (deletedKeySetInCache.remove(keyValue.getKey()) ||
keySetInCache.remove(keyValue.getKey())) {
+ continue;
+ }
+ numKeysUnderDir++;
+ for (OmLCRule rule : ruleList) {
+ if (key.getParentObjectID() == currentDirObjID &&
rule.match(key, keyPath)) {
// mark key as expired, check next key
- if (expiredKeyList.isFull()) {
- // if expiredKeyList is full, send delete/rename request for
expired keys
- handleAndClearFullList(bucket, expiredKeyList, false);
+ if (keyList.isFull()) {
+ // if keyList is full, send delete request for pending
deletion keys
+ handleAndClearFullList(bucket, keyList, false);
}
- expiredKeyList.add(key.getKeyName(), key.getReplicatedSize(),
key.getUpdateID());
- break;
+ keyList.add(keyPath, key.getReplicatedSize(),
key.getUpdateID());
+ numKeysExpired++;
}
}
}
} catch (IOException e) {
- // log failure and continue the process to delete/move files already
identified in this run
- LOG.warn("Failed to iterate keyTable for bucket {}", bucketKey, e);
+ // log failure and continue the process other directories in stack
+ LOG.warn("Failed to iterate keyTable for bucket {}/{}", volumeName,
bucketName, e);
+ continue;
}
- try (TableIterator<String, ? extends Table.KeyValue<String,
OmDirectoryInfo>> dirTblItr =
- directoryInfoTable.iterator(prefix)) {
- while (dirTblItr.hasNext()) {
- Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
- OmDirectoryInfo dir = entry.getValue();
- numDirIterated++;
- // skip TRASH_PREFIX directory
- if (dir.getName().equals(TRASH_PREFIX)) {
- continue;
- }
- for (OmLCRule rule : noPrefixRuleList) {
- if (rule.match(dir, dir.getPath())) {
- // mark directory as expired, check next directory
- if (expiredDirList.isFull()) {
- // if expiredDirList is full, send delete/rename request for
expired directories
- handleAndClearFullList(bucket, expiredDirList, true);
- }
- expiredDirList.add(dir.getPath(), 0, dir.getUpdateID());
- break;
+ // if this directory is empty or all files/subDirs are expired,
evaluate itself
+ if ((numKeysUnderDir == 0 && subDirSummary.getSubDirCount() == 0) ||
+ (numKeysUnderDir == numKeysExpired && deletedDirCount ==
subDirSummary.getSubDirCount())) {
+ for (OmLCRule rule : ruleList) {
+ String path = (rule.getEffectivePrefix() != null &&
rule.getEffectivePrefix().endsWith(OM_KEY_PREFIX)) ?
+ currentDirPath + OM_KEY_PREFIX : currentDirPath;
+ if (currentDir != null && rule.match(currentDir, path)) {
+ if (dirList.isFull()) {
+ // if expiredDirList is full, send delete request for pending
deletion directories
+ handleAndClearFullList(bucket, dirList, true);
}
+ dirList.add(currentDirPath, 0, currentDir.getUpdateID());
+ deletedDirSet.add(currentDir.getObjectID());
}
}
- } catch (IOException e) {
- // log failure and continue the process to delete/move directories
already identified in this run
- LOG.warn("Failed to iterate directoryTable for bucket {}",
bucketKey, e);
}
}
}
- private void evaluateKeyTable(Table<String, OmKeyInfo> keyTable, String
prefix, String directoryPath,
- OmLCRule rule, LimitedExpiredObjectList keyList, OmBucketInfo bucket) {
- String volumeName = bucket.getVolumeName();
- String bucketName = bucket.getBucketName();
- try (TableIterator<String, ? extends Table.KeyValue<String, OmKeyInfo>>
keyTblItr =
- keyTable.iterator(prefix)) {
- while (keyTblItr.hasNext()) {
- Table.KeyValue<String, OmKeyInfo> keyValue = keyTblItr.next();
- OmKeyInfo key = keyValue.getValue();
- String keyPath = directoryPath + key.getKeyName();
- numKeyIterated++;
- if (rule.match(key, keyPath)) {
- // mark key as expired, check next key
- if (keyList.isFull()) {
- // if keyList is full, send delete/rename request for expired
keys
- handleAndClearFullList(bucket, keyList, false);
- }
- keyList.add(keyPath, key.getReplicatedSize(), key.getUpdateID());
- }
+ private SubDirectorySummary getSubDirectory(long dirObjID, String prefix,
OMMetadataManager metaMgr)
+ throws IOException {
+ SubDirectorySummary subDirList = new SubDirectorySummary();
+
+ // Check all dirTable cache for any sub paths.
+ Table dirTable = metaMgr.getDirectoryTable();
+ Iterator<Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>>>
+ cacheIter = dirTable.cacheIterator();
+ HashSet<String> deletedDirSet = new HashSet();
+ while (cacheIter.hasNext()) {
+ Map.Entry<CacheKey<String>, CacheValue<OmDirectoryInfo>> entry =
+ cacheIter.next();
+ numDirIterated++;
+ OmDirectoryInfo cacheOmDirInfo = entry.getValue().getCacheValue();
+ if (cacheOmDirInfo == null) {
+ deletedDirSet.add(entry.getKey().getCacheKey());
+ continue;
+ }
+ if (cacheOmDirInfo.getParentObjectID() == dirObjID) {
+ subDirList.addSubDir(cacheOmDirInfo);
}
- } catch (IOException e) {
- // log failure and continue the process to delete/move files already
identified in this run
- LOG.warn("Failed to iterate keyTable for bucket {}/{}", volumeName,
bucketName, e);
}
- }
- private void evaluateDirTable(Table<String, OmDirectoryInfo>
directoryInfoTable, String prefix,
- String directoryPath, OmLCRule rule, LimitedExpiredObjectList dirList,
OmBucketInfo bucket) {
- String volumeName = bucket.getVolumeName();
- String bucketName = bucket.getBucketName();
- try (TableIterator<String, ? extends Table.KeyValue<String,
OmDirectoryInfo>> dirTblItr =
- directoryInfoTable.iterator(prefix)) {
- while (dirTblItr.hasNext()) {
- Table.KeyValue<String, OmDirectoryInfo> entry = dirTblItr.next();
- OmDirectoryInfo dir = entry.getValue();
+ // Check dirTable entries for any sub paths.
+ try (TableIterator<String, ? extends Table.KeyValue<String,
OmDirectoryInfo>>
+ iterator = dirTable.iterator(prefix)) {
+ while (iterator.hasNext()) {
numDirIterated++;
- // skip TRASH_PREFIX directory
- if (dir.getName().equals(TRASH_PREFIX)) {
+ Table.KeyValue<String, OmDirectoryInfo> entry = iterator.next();
+ OmDirectoryInfo dir = entry.getValue();
+ if (deletedDirSet.contains(entry.getKey())) {
continue;
}
- String dirPath = directoryPath + dir.getName();
- if (rule.match(dir, dirPath)) {
- // mark dir as expired, check next key
- if (dirList.isFull()) {
- // if dirList is full, send delete/rename request for expired
directories
- handleAndClearFullList(bucket, dirList, true);
- }
- dirList.add(dirPath, 0, dir.getUpdateID());
+ if (dir.getParentObjectID() == dirObjID) {
+ subDirList.addSubDir(dir);
}
}
- } catch (IOException e) {
- // log failure and continue the process to delete/move files already
identified in this run
- LOG.warn("Failed to iterate directoryTable for bucket {}/{}",
volumeName, bucketName, e);
}
+ return subDirList;
}
private void evaluateBucket(OmBucketInfo bucketInfo,
@@ -577,17 +718,6 @@ private void evaluateBucket(OmBucketInfo bucketInfo,
}
}
- private OmDirectoryInfo getDirectory(OmVolumeArgs volume, OmBucketInfo
bucket, String prefix, String bucketKey) {
- String dbDirName = omMetadataManager.getOzonePathKey(
- volume.getObjectID(), bucket.getObjectID(), bucket.getObjectID(),
prefix);
- try {
- return omMetadataManager.getDirectoryTable().get(dbDirName);
- } catch (IOException e) {
- LOG.info("Failed to get directory object of {} for bucket {}",
dbDirName, bucketKey);
- return null;
- }
- }
-
/**
* If prefix is /dir1/dir2, but dir1 doesn't exist, then it will return
exception.
* If prefix is /dir1/dir2, but dir2 doesn't exist, then it will return a
list with dir1 only.
@@ -641,13 +771,13 @@ private void onSuccess(String bucketName) {
metrics.incNumKeyIterated(numKeyIterated);
metrics.incNumDirIterated(numDirIterated);
LOG.info("Spend {} ms on bucket {} to iterate {} keys and {} dirs,
deleted {} keys with {} bytes, " +
- "and {} dirs, renamed {} keys with {} bytes to trash", timeSpent,
bucketName, numKeyIterated, numDirIterated,
- numKeyDeleted, sizeKeyDeleted, numDirDeleted, numKeyRenamed,
sizeKeyRenamed);
+ "and {} dirs, renamed {} keys with {} bytes, and {} dirs to trash",
timeSpent, bucketName, numKeyIterated,
+ numDirIterated, numKeyDeleted, sizeKeyDeleted, numDirDeleted,
numKeyRenamed, sizeKeyRenamed, numDirRenamed);
}
private void handleAndClearFullList(OmBucketInfo bucket,
LimitedExpiredObjectList keysList, boolean dir) {
if (bucket.getBucketLayout() != OBJECT_STORE && ozoneTrash != null) {
- moveKeysToTrash(bucket, keysList);
+ moveToTrash(bucket, keysList, dir);
} else {
sendDeleteKeysRequestAndClearList(bucket.getVolumeName(),
bucket.getBucketName(), keysList, dir);
}
@@ -748,16 +878,16 @@ private void sendDeleteKeysRequestAndClearList(String
volume, String bucket,
}
}
- private void moveKeysToTrash(OmBucketInfo bucket, LimitedExpiredObjectList
keysList) {
+ private void moveToTrash(OmBucketInfo bucket, LimitedExpiredObjectList
keysList, boolean isDir) {
if (keysList.isEmpty()) {
return;
}
String volumeName = bucket.getVolumeName();
String bucketName = bucket.getBucketName();
- String trashCurrent;
- UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(bucket.getOwner());
+ String trashRoot = TRASH_PREFIX + OM_KEY_PREFIX + bucket.getOwner();
+ Path trashCurrent = new Path(trashRoot, CURRENT);
try {
- trashCurrent = checkAndCreateTrashDirectoryIfNeeded(bucket, ugi);
+ checkAndCreateTrashDirIfNeeded(bucket, trashCurrent);
} catch (IOException e) {
keysList.clear();
return;
@@ -765,6 +895,15 @@ private void moveKeysToTrash(OmBucketInfo bucket,
LimitedExpiredObjectList keysL
for (int i = 0; i < keysList.size(); i++) {
String keyName = keysList.getName(i);
+ Path keyPath = new Path(OzoneConsts.OZONE_URI_DELIMITER + keyName);
+ Path baseKeyTrashPath = Path.mergePaths(trashCurrent,
keyPath.getParent());
+ try {
+ checkAndCreateTrashDirIfNeeded(bucket, baseKeyTrashPath);
+ } catch (IOException e) {
+ LOG.error("Failed to check and create Trash dir {} for bucket
{}/{}", baseKeyTrashPath,
+ volumeName, bucketName, e);
+ continue;
+ }
String targetKeyName = trashCurrent + OM_KEY_PREFIX + keyName;
KeyArgs keyArgs = KeyArgs.newBuilder().setKeyName(keyName)
.setVolumeName(volumeName).setBucketName(bucketName).build();
@@ -790,6 +929,7 @@ private void moveKeysToTrash(OmBucketInfo bucket,
LimitedExpiredObjectList keysL
try {
// perform preExecute as ratis submit do no perform preExecute
OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+ UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(bucket.getOwner());
OzoneManagerProtocolProtos.OMResponse omResponse =
ugi.doAs(new
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
@Override
@@ -806,35 +946,37 @@ public OzoneManagerProtocolProtos.OMResponse run() throws
Exception {
continue;
}
}
- LOG.debug("RenameKey request succeed with source key: {}, dest key:
{}", keyName, targetKeyName);
- numKeyRenamed += 1;
- sizeKeyRenamed += keysList.getReplicatedSize(i);
- metrics.incrNumKeyRenamed(1);
- metrics.incrSizeKeyRenamed(keysList.getReplicatedSize(i));
- } catch (InterruptedException | IOException e) {
+ LOG.info("RenameKey request succeed with source key: {}, dest key:
{}", keyName, targetKeyName);
+
+ if (isDir) {
+ numDirRenamed += 1;
+ metrics.incrNumDirRenamed(1);
+ } else {
+ numKeyRenamed += 1;
+ sizeKeyRenamed += keysList.getReplicatedSize(i);
+ metrics.incrNumKeyRenamed(1);
+ metrics.incrSizeKeyRenamed(keysList.getReplicatedSize(i));
+ }
+ } catch (IOException | InterruptedException e) {
LOG.error("Failed to send RenameKeysRequest", e);
}
}
keysList.clear();
}
- private String checkAndCreateTrashDirectoryIfNeeded(OmBucketInfo bucket,
UserGroupInformation ugi)
- throws IOException {
- String userTrashRoot = TRASH_PREFIX + OM_KEY_PREFIX + bucket.getOwner();
- String userTrashCurrent = userTrashRoot + OM_KEY_PREFIX + CURRENT;
+ private void checkAndCreateTrashDirIfNeeded(OmBucketInfo bucket, Path
dirPath) throws IOException {
+ OmKeyArgs key = new
OmKeyArgs.Builder().setVolumeName(bucket.getVolumeName())
+ .setBucketName(bucket.getBucketName()).setKeyName(dirPath.toString())
+ .setOwnerName(bucket.getOwner()).build();
try {
- OmKeyArgs key = new
OmKeyArgs.Builder().setVolumeName(bucket.getVolumeName())
- .setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
- .setOwnerName(bucket.getOwner()).build();
ozoneManager.getFileStatus(key);
- return userTrashCurrent;
} catch (IOException e) {
if (e instanceof OMException &&
(((OMException) e).getResult() ==
OMException.ResultCodes.FILE_NOT_FOUND ||
((OMException) e).getResult() ==
OMException.ResultCodes.DIRECTORY_NOT_FOUND)) {
// create the trash/Current directory for user
KeyArgs keyArgs =
KeyArgs.newBuilder().setVolumeName(bucket.getVolumeName())
-
.setBucketName(bucket.getBucketName()).setKeyName(userTrashCurrent)
+
.setBucketName(bucket.getBucketName()).setKeyName(dirPath.toString())
.setOwnerName(bucket.getOwner()).setRecursive(true).build();
OMRequest omRequest =
OMRequest.newBuilder().setCreateDirectoryRequest(
CreateDirectoryRequest.newBuilder().setKeyArgs(keyArgs))
@@ -845,6 +987,7 @@ private String
checkAndCreateTrashDirectoryIfNeeded(OmBucketInfo bucket, UserGro
try {
// perform preExecute as ratis submit do no perform preExecute
final OMClientRequest omClientRequest =
OzoneManagerRatisUtils.createClientRequest(omRequest, ozoneManager);
+ UserGroupInformation ugi =
UserGroupInformation.createRemoteUser(bucket.getOwner());
OzoneManagerProtocolProtos.OMResponse omResponse =
ugi.doAs(new
PrivilegedExceptionAction<OzoneManagerProtocolProtos.OMResponse>() {
@Override
@@ -858,19 +1001,19 @@ public OzoneManagerProtocolProtos.OMResponse run()
throws Exception {
if (omResponse != null) {
if (!omResponse.getSuccess()) {
LOG.error("CreateDirectory request failed with {}, path: {}",
- omResponse.getMessage(), userTrashCurrent);
- throw new IOException("Failed to create trash directory " +
userTrashCurrent);
+ omResponse.getMessage(), dirPath);
+ throw new IOException("Failed to create trash directory " +
dirPath);
}
- LOG.debug("Created trash current directory: {}",
userTrashCurrent);
- return userTrashCurrent;
}
+ LOG.info("Created directory {}/{}/{}", bucket.getVolumeName(),
bucket.getBucketName(), dirPath);
} catch (InterruptedException | IOException e1) {
- LOG.error("Failed to send CreateDirectoryRequest for {}",
userTrashCurrent, e1);
- throw new IOException("Failed to send CreateDirectoryRequest
request for " + userTrashCurrent);
+ LOG.error("Failed to send CreateDirectoryRequest for {}", dirPath,
e1);
+ throw new IOException("Failed to send CreateDirectoryRequest
request for " + dirPath);
}
+ } else {
+ LOG.error("Failed to get trash current directory {} status",
dirPath, e);
+ throw e;
}
- LOG.error("Failed to get trash current directory {} status",
userTrashCurrent, e);
- throw e;
}
}
}
@@ -1020,4 +1163,98 @@ public void clear() {
internalList.clear();
}
}
+
+ /**
+ * An in-memory class to hold the information required in directory
recursive evaluation.
+ */
+ public static class PendingEvaluateDirectory {
+ private final OmDirectoryInfo directoryInfo;
+ private final String dirPath;
+ private SubDirectorySummary subDirSummary;
+
+ public PendingEvaluateDirectory(OmDirectoryInfo dir, String path,
SubDirectorySummary summary) {
+ this.directoryInfo = dir;
+ this.dirPath = path;
+ this.subDirSummary = summary;
+ }
+
+ public String getDirPath() {
+ return dirPath;
+ }
+
+ public OmDirectoryInfo getDirectoryInfo() {
+ return directoryInfo;
+ }
+
+ public SubDirectorySummary getSubDirSummary() {
+ return subDirSummary;
+ }
+
+ public void setSubDirSummary(SubDirectorySummary summary) {
+ subDirSummary = summary;
+ }
+ }
+
+ /**
+ * An in-memory class to hold sub directory summary.
+ */
+ public static class SubDirectorySummary {
+ private final List<OmDirectoryInfo> subDirList;
+ private long subDirCount;
+
+ public SubDirectorySummary() {
+ this.subDirList = new ArrayList<>();
+ this.subDirCount = 0;
+ }
+
+ public long getSubDirCount() {
+ return subDirCount;
+ }
+
+ public List<OmDirectoryInfo> getSubDirList() {
+ return subDirList;
+ }
+
+ public void addSubDir(OmDirectoryInfo dir) {
+ subDirList.add(dir);
+ subDirCount++;
+ }
+ }
+
+ /**
+ * An in-memory stack with a maximum size. This class is not thread safe.
+ */
+ public static class LimitedSizeStack {
+ private final Deque<PendingEvaluateDirectory> stack;
+ private final long maxSize;
+
+ public LimitedSizeStack(long maxSize) {
+ this.maxSize = maxSize;
+ this.stack = new ArrayDeque<>();
+ }
+
+ public boolean isEmpty() {
+ return stack.isEmpty();
+ }
+
+ public void push(PendingEvaluateDirectory e) throws CapacityFullException {
+ if (stack.size() >= maxSize) {
+ throw new CapacityFullException("LimitedSizeStack has reached maximum
size " + maxSize);
+ }
+ stack.push(e);
+ }
+
+ public PendingEvaluateDirectory pop() {
+ return stack.pop();
+ }
+ }
+
+ /**
+ * An exception which indicates the collection is full.
+ */
+ public static class CapacityFullException extends Exception {
+ public CapacityFullException(String message) {
+ super(message);
+ }
+ }
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
index 77efe4f2287..874b133777b 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/KeyLifecycleServiceMetrics.java
@@ -81,6 +81,8 @@ public static void unregister() {
private MutableGaugeLong numKeyDeleted;
@Metric("Total keys renamed")
private MutableGaugeLong numKeyRenamed;
+ @Metric("Total directories renamed")
+ private MutableGaugeLong numDirRenamed;
@Metric("Total size of keys deleted")
private MutableGaugeLong sizeKeyDeleted;
@Metric("Total size of keys renamed")
@@ -110,6 +112,10 @@ public void incrNumKeyRenamed(long keyCount) {
numKeyRenamed.incr(keyCount);
}
+ public void incrNumDirRenamed(long dirCount) {
+ numDirRenamed.incr(dirCount);
+ }
+
public void incrSizeKeyDeleted(long size) {
sizeKeyDeleted.incr(size);
}
@@ -130,6 +136,10 @@ public MutableGaugeLong getNumKeyRenamed() {
return numKeyRenamed;
}
+ public MutableGaugeLong getNumDirRenamed() {
+ return numDirRenamed;
+ }
+
public MutableGaugeLong getSizeKeyDeleted() {
return sizeKeyDeleted;
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
index 4a665a28ad0..cb54173266b 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestKeyLifecycleService.java
@@ -27,12 +27,14 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_ENABLED;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.om.OmConfig.Keys.ENABLE_FILESYSTEM_PATHS;
import static
org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
import static
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
import static
org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.ALL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -47,6 +49,7 @@
import java.security.PrivilegedExceptionAction;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
+import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -171,6 +174,7 @@ private void createConfig(File testDir) {
conf.setTimeDuration(OZONE_KEY_LIFECYCLE_SERVICE_INTERVAL,
SERVICE_INTERVAL, TimeUnit.MILLISECONDS);
conf.setInt(OZONE_KEY_LIFECYCLE_SERVICE_DELETE_BATCH_SIZE, 50);
conf.setQuietMode(false);
+ conf.setBoolean(ENABLE_FILESYSTEM_PATHS, false);
OmLCExpiration.setTest(true);
}
@@ -1237,8 +1241,9 @@ void testPerformanceWithNestedDir(BucketLayout
bucketLayout, String prefix)
GenericTestUtils.waitFor(() -> metrics.getNumKeyDeleted().value() -
initialKeyDeleted == keyCount,
WAIT_CHECK_INTERVAL, 5000);
assertEquals(0, metrics.getNumDirIterated().value() -
initialDirIterated);
- assertEquals(bucketLayout == FILE_SYSTEM_OPTIMIZED ? 1 : 0,
- metrics.getNumDirDeleted().value() - initialDirDeleted);
+ GenericTestUtils.waitFor(() ->
+ metrics.getNumDirDeleted().value() - initialDirDeleted ==
(bucketLayout == FILE_SYSTEM_OPTIMIZED ? 1 : 0),
+ WAIT_CHECK_INTERVAL, 10000);
deleteLifecyclePolicy(volumeName, bucketName);
}
@@ -1308,16 +1313,24 @@ void testListMaxSize(BucketLayout bucketLayout, boolean
enableTrash) throws IOEx
deleteLifecyclePolicy(volumeName, bucketName);
}
+ public Stream<Arguments> parameters7() {
+ return Stream.of(
+ arguments("FILE_SYSTEM_OPTIMIZED", "key"),
+ arguments("FILE_SYSTEM_OPTIMIZED", "dir/key"),
+ arguments("LEGACY", "key"),
+ arguments("LEGACY", "dir/key"));
+ }
+
@ParameterizedTest
- @ValueSource(strings = {"FILE_SYSTEM_OPTIMIZED", "LEGACY"})
- void testMoveToTrash(BucketLayout bucketLayout) throws IOException,
+ @MethodSource("parameters7")
+ void testMoveToTrash(BucketLayout bucketLayout, String prefix) throws
IOException,
TimeoutException, InterruptedException {
final String volumeName = getTestName();
final String bucketName = uniqueObjectName("bucket");
- String prefix = "key";
long initialDeletedKeyCount = getDeletedKeyCount();
long initialKeyCount = getKeyCount(bucketLayout);
long initialRenamedKeyCount = metrics.getNumKeyRenamed().value();
+ long initialRenamedDirCount = metrics.getNumDirRenamed().value();
// create keys
String bucketOwner =
UserGroupInformation.getCurrentUser().getShortUserName() + "-test";
List<OmKeyArgs> keyList =
@@ -1344,6 +1357,12 @@ void testMoveToTrash(BucketLayout bucketLayout) throws
IOException,
GenericTestUtils.waitFor(() ->
(metrics.getNumKeyRenamed().value() - initialRenamedKeyCount) ==
KEY_COUNT, WAIT_CHECK_INTERVAL, 50000);
assertEquals(0, getDeletedKeyCount() - initialDeletedKeyCount);
+ if (bucketLayout == FILE_SYSTEM_OPTIMIZED) {
+ // Legacy bucket doesn't have dir concept
+ GenericTestUtils.waitFor(() ->
+ metrics.getNumDirRenamed().value() - initialRenamedDirCount ==
(prefix.contains(OM_KEY_PREFIX) ? 1 : 0),
+ WAIT_CHECK_INTERVAL, 5000);
+ }
deleteLifecyclePolicy(volumeName, bucketName);
// verify trash directory has the right native ACLs
List<KeyInfoWithVolumeContext> dirList = new ArrayList<>();
@@ -1378,7 +1397,7 @@ void testMoveToTrash(BucketLayout bucketLayout) throws
IOException,
assertEquals(KEY_COUNT, getKeyCount(bucketLayout) - initialKeyCount);
} else {
// For legacy bucket, trash directories along .Trash/user-test/Current
are in key table too.
- assertEquals(KEY_COUNT + 3, getKeyCount(bucketLayout) -
initialKeyCount);
+ assertEquals(KEY_COUNT + (prefix.contains(OM_KEY_PREFIX) ? 4 : 3),
getKeyCount(bucketLayout) - initialKeyCount);
}
// create new policy to test rule with prefix ".Trash/" is ignored
during lifecycle evaluation
now = ZonedDateTime.now(ZoneOffset.UTC);
@@ -1401,6 +1420,15 @@ void testMoveToTrash(BucketLayout bucketLayout) throws
IOException,
() -> log.getOutput().contains("Skip evaluate trash directory " +
TRASH_PREFIX), WAIT_CHECK_INTERVAL, 5000);
deleteLifecyclePolicy(volumeName, bucketName);
+ // create new policy to test rule with prefix ".Tras" is ignored during
lifecycle evaluation
+ now = ZonedDateTime.now(ZoneOffset.UTC);
+ date = now.plusSeconds(EXPIRE_SECONDS);
+ createLifecyclePolicy(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED,
".Tras", null, date.toString(), true);
+
+ GenericTestUtils.waitFor(
+ () -> log.getOutput().contains("Skip evaluate trash directory " +
TRASH_PREFIX), WAIT_CHECK_INTERVAL, 5000);
+ deleteLifecyclePolicy(volumeName, bucketName);
+
// create new policy to test trash directory is skipped during lifecycle
evaluation
now = ZonedDateTime.now(ZoneOffset.UTC);
date = now.plusSeconds(EXPIRE_SECONDS);
@@ -1410,6 +1438,104 @@ void testMoveToTrash(BucketLayout bucketLayout) throws
IOException,
() -> log.getOutput().contains("No expired keys/dirs found/remained
for bucket"), WAIT_CHECK_INTERVAL, 5000);
deleteLifecyclePolicy(volumeName, bucketName);
}
+
+ public Stream<Arguments> parameters8() {
+ return Stream.of(
+ arguments("dir1/dir2/dir3/key", null, "dir1/dir", "dir1/dir2/dir3",
KEY_COUNT, 2, false),
+ arguments("dir1/dir2/dir3/key", null, "dir1/dir", "dir1/dir2/dir3",
KEY_COUNT, 0, true),
+ arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir1/dir",
"dir1/dir2/dir3", KEY_COUNT * 2, 4, false),
+ arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir1/dir",
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true),
+ arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2/",
"dir1/dir2/dir3", KEY_COUNT, 2, false),
+ arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2/",
"dir1/dir2/dir3", KEY_COUNT, 0, true),
+ arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2",
"dir1/dir2/dir3",
+ KEY_COUNT * 2, 4, false),
+ arguments("dir1/dir2/dir3/key", "dir1/dir22/dir5/key", "dir1/dir2",
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true),
+ arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir",
"dir1/dir2/dir3", KEY_COUNT * 2, 5, false),
+ arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "dir",
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true),
+ arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1/",
"dir1/dir2/dir3", KEY_COUNT, 3, false),
+ arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1/",
"dir1/dir2/dir3", KEY_COUNT, 0, true),
+ arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1",
"dir1/dir2/dir3", KEY_COUNT * 2, 6, false),
+ arguments("dir1/dir2/dir3/key", "dir11/dir4/dir5/key", "dir1",
"dir1/dir2/dir3", KEY_COUNT * 2, 3, true),
+ arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "",
"dir1/dir2/dir3", KEY_COUNT * 2, 5, false),
+ arguments("dir1/dir2/dir3/key", "dir1/dir4/dir5/key", "",
"dir1/dir2/dir3", KEY_COUNT * 2, 2, true));
+ }
+
+ @ParameterizedTest
+ @MethodSource("parameters8")
+ void testMultipleDirectoriesMatched(String keyPrefix1, String keyPrefix2,
String rulePrefix, String dirName,
+ int expectedDeletedKeyCount, int expectedDeletedDirCount, boolean
updateDirModificationTime)
+ throws IOException, TimeoutException, InterruptedException {
+ final String volumeName = getTestName();
+ final String bucketName = uniqueObjectName("bucket");
+ long initialDeletedKeyCount = getDeletedKeyCount();
+ long initialDeletedDirCount = getDeletedDirectoryCount();
+ long initialKeyCount = getKeyCount(FILE_SYSTEM_OPTIMIZED);
+ // create keys
+ List<OmKeyArgs> keyList1 =
+ createKeys(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED, KEY_COUNT,
1, keyPrefix1, null);
+ // check there are keys in keyTable
+ assertEquals(KEY_COUNT, keyList1.size());
+ GenericTestUtils.waitFor(() -> getKeyCount(FILE_SYSTEM_OPTIMIZED) -
initialKeyCount == KEY_COUNT,
+ WAIT_CHECK_INTERVAL, 1000);
+
+ if (keyPrefix2 != null) {
+ List<OmKeyArgs> keyList2 = new ArrayList<>();
+ for (int x = 0; x < KEY_COUNT; x++) {
+ final String keyName = uniqueObjectName(keyPrefix2);
+ OmKeyArgs keyArg = createAndCommitKey(volumeName, bucketName,
keyName, 1, null);
+ keyList2.add(keyArg);
+ }
+ // check there are keys in keyTable
+ assertEquals(KEY_COUNT, keyList2.size());
+ GenericTestUtils.waitFor(() -> getKeyCount(FILE_SYSTEM_OPTIMIZED) -
initialKeyCount == KEY_COUNT * 2,
+ WAIT_CHECK_INTERVAL, 1000);
+ }
+
+ // assert directory exists
+ KeyInfoWithVolumeContext keyInfo = getDirectory(volumeName, bucketName,
dirName);
+ assertFalse(keyInfo.getKeyInfo().isFile());
+
+ KeyLifecycleService.setInjectors(
+ Arrays.asList(new FaultInjectorImpl(), new FaultInjectorImpl()));
+
+ // create Lifecycle configuration
+ ZonedDateTime now = ZonedDateTime.now(ZoneOffset.UTC);
+ ZonedDateTime date = now.plusSeconds(EXPIRE_SECONDS);
+ createLifecyclePolicy(volumeName, bucketName, FILE_SYSTEM_OPTIMIZED,
rulePrefix, null, date.toString(), true);
+ LOG.info("expiry date {}", date.toInstant());
+
+ ZonedDateTime endDate = date.plus(SERVICE_INTERVAL, ChronoUnit.MILLIS);
+ GenericTestUtils.waitFor(() ->
endDate.isBefore(ZonedDateTime.now(ZoneOffset.UTC)), WAIT_CHECK_INTERVAL,
10000);
+
+ // rename a key under directory to change directory's Modification time
+ if (updateDirModificationTime) {
+ writeClient.renameKey(keyList1.get(0), keyList1.get(0).getKeyName() +
"-new");
+ LOG.info("Dir {} refreshes its modification time", dirName);
+ KeyInfoWithVolumeContext keyInfo2 = getDirectory(volumeName,
bucketName, dirName);
+ assertNotEquals(keyInfo.getKeyInfo().getModificationTime(),
keyInfo2.getKeyInfo().getModificationTime());
+ }
+
+ // resume KeyLifecycleService bucket scan
+ KeyLifecycleService.getInjector(0).resume();
+ KeyLifecycleService.getInjector(1).resume();
+
+ GenericTestUtils.waitFor(() ->
+ (getDeletedKeyCount() - initialDeletedKeyCount) ==
expectedDeletedKeyCount, WAIT_CHECK_INTERVAL, 10000);
+ if (keyPrefix2 == null) {
+ assertEquals(0, getKeyCount(FILE_SYSTEM_OPTIMIZED) - initialKeyCount);
+ } else {
+ assertEquals(KEY_COUNT * 2 - expectedDeletedKeyCount,
getKeyCount(FILE_SYSTEM_OPTIMIZED) - initialKeyCount);
+ }
+ GenericTestUtils.waitFor(() -> getDeletedDirectoryCount() -
initialDeletedDirCount == expectedDeletedDirCount,
+ WAIT_CHECK_INTERVAL, 10000);
+ if (updateDirModificationTime) {
+ KeyInfoWithVolumeContext directory = getDirectory(volumeName,
bucketName, dirName);
+ assertNotNull(directory);
+ } else {
+ assertThrows(OMException.class, () -> getDirectory(volumeName,
bucketName, dirName));
+ }
+ deleteLifecyclePolicy(volumeName, bucketName);
+ }
}
/**
@@ -1609,7 +1735,7 @@ void testUnsupportedPrefixForFSO(String prefix, boolean
createPrefix) {
private List<OmKeyArgs> createKeys(String volume, String bucket,
BucketLayout bucketLayout,
int keyCount, int numBlocks, String keyPrefix, Map<String, String> tags)
throws IOException {
- return createKeys(volume, bucket, bucketLayout,
UserGroupInformation.getCurrentUser().getShortUserName(),
+ return createKeys(volume, bucket, bucketLayout,
UserGroupInformation.getCurrentUser().getShortUserName(),
keyCount, numBlocks, keyPrefix, tags);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]