This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 63ee00df7a6 HDDS-13844. Decouple DirectoryDeletingService delete
batching from Ratis request size. (#9270)
63ee00df7a6 is described below
commit 63ee00df7a6c663f629e43fdbf615c8ffed74613
Author: Aryan Gupta <[email protected]>
AuthorDate: Fri Nov 21 01:48:49 2025 +0530
HDDS-13844. Decouple DirectoryDeletingService delete batching from Ratis
request size. (#9270)
---
.../common/src/main/resources/ozone-default.xml | 8 ++
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 5 +
.../TestDirectoryDeletingServiceWithFSO.java | 5 +-
.../apache/hadoop/ozone/om/DeleteKeysResult.java | 10 +-
.../org/apache/hadoop/ozone/om/KeyManager.java | 13 +--
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 38 +++----
.../ozone/om/service/DirectoryDeletingService.java | 124 ++++++++++++++-------
.../om/service/TestDirectoryDeletingService.java | 85 +++++++++++++-
8 files changed, 205 insertions(+), 83 deletions(-)
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 1e8df1c6747..5d36eb3b8f2 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -701,6 +701,14 @@
hdds.container.ratis.datanode.storage.dir be configured separately.
</description>
</property>
+ <property>
+ <name>ozone.path.deleting.limit.per.task</name>
+ <value>20000</value>
+ <tag>OZONE, PERFORMANCE, OM</tag>
+ <description>A maximum number of paths(dirs/files) to be deleted by
+ directory deleting service per time interval.
+ </description>
+ </property>
<property>
<name>ozone.metadata.dirs.permissions</name>
<value>750</value>
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 469900aa8ea..ce00ec86b92 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -385,6 +385,11 @@ public final class OMConfigKeys {
public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT
= "60s";
+ public static final String OZONE_PATH_DELETING_LIMIT_PER_TASK =
+ "ozone.path.deleting.limit.per.task";
+ // default is 20000 taking account of 32MB buffer size
+ public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 20000;
+
/**
* Configuration properties for Snapshot Directory Service.
*/
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
index 81e1dd3b444..eb77ac1dce3 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
@@ -41,6 +41,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.LongSupplier;
import org.apache.commons.lang3.RandomStringUtils;
@@ -623,9 +624,9 @@ public void
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
}
return null;
}).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(),
- anyLong(), anyList(), anyList(), eq(null), anyLong(), anyLong(), any(),
+ anyLong(), anyList(), anyList(), eq(null), anyLong(), any(),
any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class),
anyMap(), any(),
- anyLong());
+ anyLong(), any(AtomicInteger.class));
Mockito.doAnswer(i -> {
store.createSnapshot(testVolumeName, testBucketName, snap2);
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
index 60378467d6d..2b685edf273 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java
@@ -27,14 +27,11 @@
public class DeleteKeysResult {
private List<OmKeyInfo> keysToDelete;
- private long consumedSize;
private boolean processedKeys;
- public DeleteKeysResult(List<OmKeyInfo> keysToDelete,
- long consumedSize, boolean processedKeys) {
+ public DeleteKeysResult(List<OmKeyInfo> keysToDelete, boolean processedKeys)
{
this.keysToDelete = keysToDelete;
- this.consumedSize = consumedSize;
this.processedKeys = processedKeys;
}
@@ -42,11 +39,8 @@ public List<OmKeyInfo> getKeysToDelete() {
return keysToDelete;
}
- public long getConsumedSize() {
- return consumedSize;
- }
-
public boolean isProcessedKeys() {
return processedKeys;
}
+
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
index 872a99e94b1..b0562049f71 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java
@@ -306,9 +306,9 @@ default List<Table.KeyValue<String, OmKeyInfo>>
getDeletedDirEntries(String volu
* @return list of dirs
* @throws IOException
*/
- DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId,
- OmKeyInfo parentInfo, CheckedFunction<Table.KeyValue<String, OmKeyInfo>,
Boolean, IOException> filter,
- long remainingBufLimit) throws IOException;
+ DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId,
OmKeyInfo parentInfo,
+ CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, int remainingNum)
+ throws IOException;
/**
* Returns all sub files under the given parent directory.
@@ -317,10 +317,9 @@ DeleteKeysResult getPendingDeletionSubDirs(long volumeId,
long bucketId,
* @return list of files
* @throws IOException
*/
- DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
- long bucketId, OmKeyInfo parentInfo,
- CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, long remainingBufLimit)
- throws IOException;
+ DeleteKeysResult getPendingDeletionSubFiles(long volumeId, long bucketId,
OmKeyInfo parentInfo,
+ CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, int remainingNum)
+ throws IOException;
/**
* Returns the instance of Directory Deleting Service.
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index eaca8ab2c52..19ef77d3457 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -2276,49 +2276,37 @@ private void slimLocationVersion(OmKeyInfo... keyInfos)
{
}
@Override
- public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long
bucketId,
- OmKeyInfo parentInfo, CheckedFunction<KeyValue<String, OmKeyInfo>,
Boolean, IOException> filter,
- long remainingBufLimit) throws IOException {
+ public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long
bucketId, OmKeyInfo parentInfo,
+ CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, int remainingNum) throws IOException {
return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo,
metadataManager.getDirectoryTable(),
kv ->
Table.newKeyValue(metadataManager.getOzoneDeletePathKey(kv.getValue().getObjectID(),
kv.getKey()),
- OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())),
- filter, remainingBufLimit);
+ OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())),
filter, remainingNum);
}
- private <T extends WithParentObjectId> DeleteKeysResult
gatherSubPathsWithIterator(
- long volumeId, long bucketId, OmKeyInfo parentInfo,
- Table<String, T> table,
+ private <T extends WithParentObjectId> DeleteKeysResult
gatherSubPathsWithIterator(long volumeId, long bucketId,
+ OmKeyInfo parentInfo, Table<String, T> table,
CheckedFunction<KeyValue<String, T>, KeyValue<String, OmKeyInfo>,
IOException> deleteKeyTransformer,
- CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
deleteKeyFilter,
- long remainingBufLimit) throws IOException {
+ CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
deleteKeyFilter, int remainingNum)
+ throws IOException {
List<OmKeyInfo> keyInfos = new ArrayList<>();
- String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId,
- parentInfo.getObjectID(), "");
- long consumedSize = 0;
+ String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId,
parentInfo.getObjectID(), "");
try (TableIterator<String, ? extends KeyValue<String, T>> iterator =
table.iterator(seekFileInDB)) {
- while (iterator.hasNext() && remainingBufLimit > 0) {
+ while (iterator.hasNext() && remainingNum > 0) {
KeyValue<String, T> entry = iterator.next();
- final long objectSerializedSize = entry.getValueByteSize();
- // No need to check the table again as the value in cache and iterator
would be same when directory
- // deleting service runs.
- if (remainingBufLimit - objectSerializedSize < 0) {
- break;
- }
KeyValue<String, OmKeyInfo> keyInfo =
deleteKeyTransformer.apply(entry);
if (deleteKeyFilter.apply(keyInfo)) {
keyInfos.add(keyInfo.getValue());
- remainingBufLimit -= objectSerializedSize;
- consumedSize += objectSerializedSize;
+ remainingNum--;
}
}
- return new DeleteKeysResult(keyInfos, consumedSize, !iterator.hasNext());
+ return new DeleteKeysResult(keyInfos, !iterator.hasNext());
}
}
@Override
public DeleteKeysResult getPendingDeletionSubFiles(long volumeId,
long bucketId, OmKeyInfo parentInfo,
- CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, long remainingBufLimit)
+ CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
filter, int remainingNum)
throws IOException {
CheckedFunction<KeyValue<String, OmKeyInfo>, KeyValue<String, OmKeyInfo>,
IOException> tranformer = kv -> {
OmKeyInfo keyInfo = OMFileRequest.getKeyInfoWithFullPath(parentInfo,
kv.getValue());
@@ -2327,7 +2315,7 @@ public DeleteKeysResult getPendingDeletionSubFiles(long
volumeId,
return Table.newKeyValue(deleteKey, keyInfo);
};
return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo,
metadataManager.getFileTable(), tranformer,
- filter, remainingBufLimit);
+ filter, remainingNum);
}
public boolean isBucketFSOptimized(String volName, String buckName)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 45256a7ff05..a79eeda74f2 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -19,6 +19,8 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT;
@@ -44,6 +46,7 @@
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -160,6 +163,7 @@ public class DirectoryDeletingService extends
AbstractKeyDeletingService {
private final AtomicLong deletedDirsCount;
private final AtomicLong movedDirsCount;
private final AtomicLong movedFilesCount;
+ private final int pathLimitPerTask;
public DirectoryDeletingService(long interval, TimeUnit unit,
long serviceTimeout, OzoneManager ozoneManager,
@@ -181,6 +185,8 @@ public DirectoryDeletingService(long interval, TimeUnit
unit,
this.deletedDirsCount = new AtomicLong(0);
this.movedDirsCount = new AtomicLong(0);
this.movedFilesCount = new AtomicLong(0);
+ this.pathLimitPerTask =
+ configuration.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK,
OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT);
}
public void registerReconfigCallbacks(ReconfigurationHandler handler) {
@@ -261,31 +267,28 @@ void optimizeDirDeletesAndSubmitRequest(
List<Pair<String, OmKeyInfo>> allSubDirList,
List<PurgePathRequest> purgePathRequestList,
String snapTableKey, long startTime,
- long remainingBufLimit, KeyManager keyManager,
+ KeyManager keyManager,
CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
reclaimableDirChecker,
CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException>
reclaimableFileChecker,
Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap,
- UUID expectedPreviousSnapshotId, long rnCnt) {
+ UUID expectedPreviousSnapshotId, long rnCnt, AtomicInteger remainNum) {
// Optimization to handle delete sub-dir and keys to remove quickly
// This case will be useful to handle when depth of directory is high
int subdirDelNum = 0;
int subDirRecursiveCnt = 0;
- int consumedSize = 0;
- while (subDirRecursiveCnt < allSubDirList.size() && remainingBufLimit > 0)
{
+ while (subDirRecursiveCnt < allSubDirList.size() && remainNum.get() > 0) {
try {
Pair<String, OmKeyInfo> stringOmKeyInfoPair =
allSubDirList.get(subDirRecursiveCnt++);
Boolean subDirectoryReclaimable =
reclaimableDirChecker.apply(Table.newKeyValue(stringOmKeyInfoPair.getKey(),
stringOmKeyInfoPair.getValue()));
Optional<PurgePathRequest> request = prepareDeleteDirRequest(
stringOmKeyInfoPair.getValue(), stringOmKeyInfoPair.getKey(),
subDirectoryReclaimable, allSubDirList,
- keyManager, reclaimableFileChecker, remainingBufLimit);
+ keyManager, reclaimableFileChecker, remainNum);
if (!request.isPresent()) {
continue;
}
PurgePathRequest requestVal = request.get();
- consumedSize += requestVal.getSerializedSize();
- remainingBufLimit -= consumedSize;
purgePathRequestList.add(requestVal);
// Count up the purgeDeletedDir, subDirs and subFiles
if (requestVal.hasDeletedDir() &&
!StringUtils.isBlank(requestVal.getDeletedDir())) {
@@ -300,7 +303,7 @@ void optimizeDirDeletesAndSubmitRequest(
}
}
if (!purgePathRequestList.isEmpty()) {
- submitPurgePaths(purgePathRequestList, snapTableKey,
expectedPreviousSnapshotId, bucketNameInfoMap);
+ submitPurgePathsWithBatching(purgePathRequestList, snapTableKey,
expectedPreviousSnapshotId, bucketNameInfoMap);
}
if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -378,7 +381,7 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest(
List<Pair<String, OmKeyInfo>> subDirList,
KeyManager keyManager,
CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException>
reclaimableFileFilter,
- long remainingBufLimit) throws IOException {
+ AtomicInteger remainNum) throws IOException {
// step-0: Get one pending deleted directory
if (LOG.isDebugEnabled()) {
LOG.debug("Pending deleted dir name: {}",
@@ -389,12 +392,13 @@ private Optional<PurgePathRequest>
prepareDeleteDirRequest(
.getVolumeBucketIdPairFSO(delDirName);
// step-1: get all sub directories under the deletedDir
+ int remainingNum = remainNum.get();
DeleteKeysResult subDirDeleteResult =
keyManager.getPendingDeletionSubDirs(volumeBucketId.getVolumeId(),
volumeBucketId.getBucketId(),
- pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit);
+ pendingDeletedDirInfo, keyInfo -> true, remainingNum);
List<OmKeyInfo> subDirs = subDirDeleteResult.getKeysToDelete();
subDirs.forEach(omKeyInfo -> omKeyInfo.setAcls(Collections.emptyList()));
- remainingBufLimit -= subDirDeleteResult.getConsumedSize();
+ remainNum.addAndGet(-subDirs.size());
OMMetadataManager omMetadataManager = keyManager.getMetadataManager();
for (OmKeyInfo dirInfo : subDirs) {
@@ -408,11 +412,13 @@ private Optional<PurgePathRequest>
prepareDeleteDirRequest(
// step-2: get all sub files under the deletedDir
// Only remove sub files if the parent directory is going to be deleted or
can be reclaimed.
+ remainingNum = remainNum.get();
DeleteKeysResult subFileDeleteResult =
keyManager.getPendingDeletionSubFiles(volumeBucketId.getVolumeId(),
volumeBucketId.getBucketId(),
- pendingDeletedDirInfo, keyInfo -> purgeDir ||
reclaimableFileFilter.apply(keyInfo), remainingBufLimit);
+ pendingDeletedDirInfo, keyInfo -> purgeDir ||
reclaimableFileFilter.apply(keyInfo), remainingNum);
List<OmKeyInfo> subFiles = subFileDeleteResult.getKeysToDelete();
subFiles.forEach(omKeyInfo -> omKeyInfo.setAcls(Collections.emptyList()));
+ remainNum.addAndGet(-subFiles.size());
if (LOG.isDebugEnabled()) {
for (OmKeyInfo fileInfo : subFiles) {
@@ -422,11 +428,14 @@ private Optional<PurgePathRequest>
prepareDeleteDirRequest(
// step-3: If both sub-dirs and sub-files are exhausted under a parent
// directory, only then delete the parent.
- String purgeDeletedDir = purgeDir && subDirDeleteResult.isProcessedKeys()
&&
- subFileDeleteResult.isProcessedKeys() ? delDirName : null;
+ String purgeDeletedDir =
+ purgeDir && subDirDeleteResult.isProcessedKeys() &&
subFileDeleteResult.isProcessedKeys() ? delDirName : null;
if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) {
return Optional.empty();
}
+ if (purgeDeletedDir != null) {
+ remainNum.addAndGet(-1);
+ }
return Optional.of(wrapPurgeRequest(volumeBucketId.getVolumeId(),
volumeBucketId.getBucketId(),
purgeDeletedDir, subFiles, subDirs));
}
@@ -461,8 +470,50 @@ private OzoneManagerProtocolProtos.PurgePathRequest
wrapPurgeRequest(
return purgePathsRequest.build();
}
- private OzoneManagerProtocolProtos.OMResponse
submitPurgePaths(List<PurgePathRequest> requests,
+ private List<OzoneManagerProtocolProtos.OMResponse>
submitPurgePathsWithBatching(List<PurgePathRequest> requests,
String snapTableKey, UUID expectedPreviousSnapshotId,
Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap) {
+
+ List<OzoneManagerProtocolProtos.OMResponse> responses = new ArrayList<>();
+ List<PurgePathRequest> purgePathRequestBatch = new ArrayList<>();
+ long batchBytes = 0;
+
+ for (PurgePathRequest req : requests) {
+ int reqSize = req.getSerializedSize();
+
+ // If adding this request would exceed the limit, flush the current
batch first
+ if (batchBytes + reqSize > ratisByteLimit &&
!purgePathRequestBatch.isEmpty()) {
+ OzoneManagerProtocolProtos.OMResponse resp =
+ submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId,
bucketNameInfoMap, purgePathRequestBatch);
+ if (!resp.getSuccess()) {
+ return Collections.emptyList();
+ }
+ responses.add(resp);
+ purgePathRequestBatch.clear();
+ batchBytes = 0;
+ }
+
+ // Add current request to batch
+ purgePathRequestBatch.add(req);
+ batchBytes += reqSize;
+ }
+
+ // Flush remaining batch if any
+ if (!purgePathRequestBatch.isEmpty()) {
+ OzoneManagerProtocolProtos.OMResponse resp =
+ submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId,
bucketNameInfoMap, purgePathRequestBatch);
+ if (!resp.getSuccess()) {
+ return Collections.emptyList();
+ }
+ responses.add(resp);
+ }
+
+ return responses;
+ }
+
+ @VisibleForTesting
+ OzoneManagerProtocolProtos.OMResponse submitPurgeRequest(String snapTableKey,
+ UUID expectedPreviousSnapshotId, Map<VolumeBucketId, BucketNameInfo>
bucketNameInfoMap,
+ List<PurgePathRequest> pathRequests) {
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest
=
OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
@@ -476,17 +527,14 @@ private OzoneManagerProtocolProtos.OMResponse
submitPurgePaths(List<PurgePathReq
}
purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
- purgeDirRequest.addAllDeletedPath(requests);
-
purgeDirRequest.addAllBucketNameInfos(requests.stream().map(purgePathRequest ->
- new VolumeBucketId(purgePathRequest.getVolumeId(),
purgePathRequest.getBucketId())).distinct()
-
.map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
+ purgeDirRequest.addAllDeletedPath(pathRequests);
+ purgeDirRequest.addAllBucketNameInfos(pathRequests.stream()
+ .map(purgePathRequest -> new
VolumeBucketId(purgePathRequest.getVolumeId(), purgePathRequest.getBucketId()))
+
.distinct().map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
OzoneManagerProtocolProtos.OMRequest omRequest =
- OzoneManagerProtocolProtos.OMRequest.newBuilder()
- .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
- .setPurgeDirectoriesRequest(purgeDirRequest)
- .setClientId(getClientId().toString())
- .build();
+
OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
+
.setPurgeDirectoriesRequest(purgeDirRequest).setClientId(getClientId().toString()).build();
// Submit Purge paths request to OM. Acquire bootstrap lock when
processing deletes for snapshots.
try {
@@ -528,8 +576,8 @@ private
OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ
* @param keyManager KeyManager of the underlying store.
*/
@VisibleForTesting
- void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo,
KeyManager keyManager,
- long remainingBufLimit, long rnCnt) throws IOException,
ExecutionException, InterruptedException {
+ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo,
KeyManager keyManager, long rnCnt, int remainNum)
+ throws IOException, ExecutionException, InterruptedException {
String volume, bucket; String snapshotTableKey;
if (currentSnapshotInfo != null) {
volume = currentSnapshotInfo.getVolumeName();
@@ -553,8 +601,8 @@ void processDeletedDirsForStore(SnapshotInfo
currentSnapshotInfo, KeyManager key
for (int i = 0; i < numberOfParallelThreadsPerStore; i++) {
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(()
-> {
try {
- return processDeletedDirectories(currentSnapshotInfo,
keyManager, dirSupplier, remainingBufLimit,
- expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt);
+ return processDeletedDirectories(currentSnapshotInfo,
keyManager, dirSupplier,
+ expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt,
remainNum);
} catch (Throwable e) {
return false;
}
@@ -594,16 +642,16 @@ void processDeletedDirsForStore(SnapshotInfo
currentSnapshotInfo, KeyManager key
* @param currentSnapshotInfo Information about the current snapshot whose
deleted directories are being processed.
* @param keyManager Key manager of the underlying storage system to
handle key operations.
* @param dirSupplier Supplier for fetching pending deleted directories to
be processed.
- * @param remainingBufLimit Remaining buffer limit for processing
directories and files.
* @param expectedPreviousSnapshotId The UUID of the previous snapshot
expected in the chain.
* @param totalExclusiveSizeMap A map for storing total exclusive size and
exclusive replicated size
* for each snapshot.
* @param runCount The number of times the processing task has been
executed.
+ * @param remaining Number of dirs to be processed.
* @return A boolean indicating whether the processed directory list is
empty.
*/
private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyManager keyManager,
- DeletedDirSupplier dirSupplier, long remainingBufLimit, UUID
expectedPreviousSnapshotId,
- Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount) {
+ DeletedDirSupplier dirSupplier, UUID expectedPreviousSnapshotId,
+ Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount, int
remaining) {
OmSnapshotManager omSnapshotManager =
getOzoneManager().getOmSnapshotManager();
IOzoneManagerLock lock =
getOzoneManager().getMetadataManager().getLock();
String snapshotTableKey = currentSnapshotInfo == null ? null :
currentSnapshotInfo.getTableKey();
@@ -615,12 +663,12 @@ private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyM
long dirNum = 0L;
long subDirNum = 0L;
long subFileNum = 0L;
- int consumedSize = 0;
List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
Map<VolumeBucketId, BucketNameInfo> bucketNameInfos = new HashMap<>();
+ AtomicInteger remainNum = new AtomicInteger(remaining);
List<Pair<String, OmKeyInfo>> allSubDirList = new ArrayList<>();
- while (remainingBufLimit > 0) {
+ while (remainNum.get() > 0) {
KeyValue<String, OmKeyInfo> pendingDeletedDirInfo =
dirSupplier.get();
if (pendingDeletedDirInfo == null) {
break;
@@ -639,13 +687,11 @@ private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyM
Optional<PurgePathRequest> request = prepareDeleteDirRequest(
pendingDeletedDirInfo.getValue(),
pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList,
- getOzoneManager().getKeyManager(), reclaimableFileFilter,
remainingBufLimit);
+ getOzoneManager().getKeyManager(), reclaimableFileFilter,
remainNum);
if (!request.isPresent()) {
continue;
}
PurgePathRequest purgePathRequest = request.get();
- consumedSize += purgePathRequest.getSerializedSize();
- remainingBufLimit -= consumedSize;
purgePathRequestList.add(purgePathRequest);
// Count up the purgeDeletedDir, subDirs and subFiles
if (purgePathRequest.hasDeletedDir() &&
!StringUtils.isBlank(purgePathRequest.getDeletedDir())) {
@@ -657,9 +703,9 @@ private boolean processDeletedDirectories(SnapshotInfo
currentSnapshotInfo, KeyM
optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
- startTime, remainingBufLimit, getOzoneManager().getKeyManager(),
+ startTime, getOzoneManager().getKeyManager(),
reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos,
expectedPreviousSnapshotId,
- runCount);
+ runCount, remainNum);
Map<UUID, Long> exclusiveReplicatedSizeMap =
reclaimableFileFilter.getExclusiveReplicatedSizeMap();
Map<UUID, Long> exclusiveSizeMap =
reclaimableFileFilter.getExclusiveSizeMap();
List<UUID> previousPathSnapshotsInChain =
@@ -719,7 +765,7 @@ public BackgroundTaskResult call() {
snapInfo.getName())) {
KeyManager keyManager = snapInfo == null ?
getOzoneManager().getKeyManager()
: omSnapshot.get().getKeyManager();
- processDeletedDirsForStore(snapInfo, keyManager, ratisByteLimit,
run);
+ processDeletedDirsForStore(snapInfo, keyManager, run,
pathLimitPerTask);
}
} catch (IOException | ExecutionException e) {
LOG.error("Error while running delete files background task for
store {}. Will retry at next run.",
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index 9fabe5a4650..06b70dca905 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -24,20 +24,26 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
+import static org.mockito.Mockito.mockStatic;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.db.DBConfigFromFile;
import org.apache.hadoop.ozone.om.KeyManager;
@@ -51,11 +57,13 @@
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.util.Time;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.ExitUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.MockedStatic;
@@ -185,7 +193,7 @@ public void testMultithreadedDirectoryDeletion() throws
Exception {
= new OmTestManagers(conf);
OzoneManager ozoneManager = omTestManagers.getOzoneManager();
AtomicBoolean isRunning = new AtomicBoolean(true);
- try (MockedStatic mockedStatic =
Mockito.mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
+ try (MockedStatic mockedStatic = mockStatic(CompletableFuture.class,
CALLS_REAL_METHODS)) {
List<Pair<Supplier, CompletableFuture>> futureList = new ArrayList<>();
Thread deletionThread = new Thread(() -> {
while (futureList.size() < threadCount) {
@@ -221,7 +229,7 @@ public void testMultithreadedDirectoryDeletion() throws
Exception {
DirectoryDeletingService.DirDeletingTask dirDeletingTask =
ozoneManager.getKeyManager().getDirDeletingService().new
DirDeletingTask(null);
- dirDeletingTask.processDeletedDirsForStore(null,
ozoneManager.getKeyManager(), Long.MAX_VALUE, 1);
+ dirDeletingTask.processDeletedDirsForStore(null,
ozoneManager.getKeyManager(), 1, 6000);
assertThat(futureList).hasSize(threadCount);
for (Pair<Supplier, CompletableFuture> pair : futureList) {
assertTrue(pair.getRight().isDone());
@@ -231,4 +239,77 @@ public void testMultithreadedDirectoryDeletion() throws
Exception {
ozoneManager.stop();
}
}
+
+ @Test
+ @DisplayName("DirectoryDeletingService batches PurgeDirectories by Ratis
byte limit (via submitRequest spy)")
+ void testPurgeDirectoriesBatching() throws Exception {
+ final int ratisLimitBytes = 2304;
+
+ OzoneConfiguration conf = new OzoneConfiguration();
+ File testDir = Files.createTempDirectory("TestDDS-SubmitSpy").toFile();
+ ServerUtils.setOzoneMetaDirPath(conf, testDir.toString());
+ conf.setTimeDuration(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL,
100, TimeUnit.MILLISECONDS);
+
conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
ratisLimitBytes, StorageUnit.BYTES);
+ conf.setQuietMode(false);
+
+ OmTestManagers managers = new OmTestManagers(conf);
+ om = managers.getOzoneManager();
+ KeyManager km = managers.getKeyManager();
+
+ DirectoryDeletingService real = (DirectoryDeletingService)
km.getDirDeletingService();
+ DirectoryDeletingService dds = Mockito.spy(real);
+
+ List<OzoneManagerProtocolProtos.OMRequest> captured = new ArrayList<>();
+ Mockito.doAnswer(inv -> {
+ OzoneManagerProtocolProtos.OMRequest req = inv.getArgument(0);
+ captured.add(req);
+ return OzoneManagerProtocolProtos.OMResponse.newBuilder()
+
.setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories).setStatus(OzoneManagerProtocolProtos.Status.OK)
+ .build();
+
}).when(dds).submitRequest(Mockito.any(OzoneManagerProtocolProtos.OMRequest.class));
+
+ final long volumeId = 1L, bucketId = 2L;
+ List<OzoneManagerProtocolProtos.PurgePathRequest> purgeList = new
ArrayList<>();
+
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < 30; i++) {
+ sb.append("0123456789");
+ }
+ final String longSuffix = sb.toString();
+
+ for (int i = 0; i < 20; i++) {
+
purgeList.add(OzoneManagerProtocolProtos.PurgePathRequest.newBuilder().setVolumeId(volumeId).setBucketId(bucketId)
+ .setDeletedDir("dir-" + longSuffix + "-" + i).build());
+ }
+
+ org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId vbId =
+ new
org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId(volumeId, bucketId);
+ OzoneManagerProtocolProtos.BucketNameInfo bni =
+
OzoneManagerProtocolProtos.BucketNameInfo.newBuilder().setVolumeId(volumeId).setBucketId(bucketId)
+ .setVolumeName("v").setBucketName("b").build();
+ Map<org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId,
OzoneManagerProtocolProtos.BucketNameInfo>
+ bucketNameInfoMap = new HashMap<>();
+ bucketNameInfoMap.put(vbId, bni);
+
+ dds.optimizeDirDeletesAndSubmitRequest(0L, 0L, 0L, new ArrayList<>(),
purgeList, null, Time.monotonicNow(), km,
+ kv -> true, kv -> true, bucketNameInfoMap, null, 1L, new
AtomicInteger(Integer.MAX_VALUE));
+
+ assertThat(captured.size())
+ .as("Expect batching to respect Ratis byte limit")
+ .isBetween(3, 5);
+
+ for (OzoneManagerProtocolProtos.OMRequest omReq : captured) {
+
assertThat(omReq.getCmdType()).isEqualTo(OzoneManagerProtocolProtos.Type.PurgeDirectories);
+
+ OzoneManagerProtocolProtos.PurgeDirectoriesRequest purge =
omReq.getPurgeDirectoriesRequest();
+ int payloadBytes =
+
purge.getDeletedPathList().stream().mapToInt(OzoneManagerProtocolProtos.PurgePathRequest::getSerializedSize)
+ .sum();
+
+ assertThat(payloadBytes).as("Batch size should respect Ratis byte
limit").isLessThanOrEqualTo(ratisLimitBytes);
+ }
+
+ org.apache.commons.io.FileUtils.deleteDirectory(testDir);
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]