This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch ozone-2.1 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit d31c181e7f340380c16452a0b77dd849630e4c97 Author: Aryan Gupta <[email protected]> AuthorDate: Fri Nov 21 01:48:49 2025 +0530 HDDS-13844. Decouple DirectoryDeletingService delete batching from Ratis request size. (#9270) (cherry picked from commit 63ee00df7a6c663f629e43fdbf615c8ffed74613) Conflicts: hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java Change-Id: If65edcf62195323d437cdb4a5500d82578bc59a0 --- .../common/src/main/resources/ozone-default.xml | 8 ++ .../org/apache/hadoop/ozone/om/OMConfigKeys.java | 5 + .../TestDirectoryDeletingServiceWithFSO.java | 5 +- .../apache/hadoop/ozone/om/DeleteKeysResult.java | 10 +- .../org/apache/hadoop/ozone/om/KeyManager.java | 13 +-- .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 38 +++---- .../ozone/om/service/DirectoryDeletingService.java | 125 ++++++++++++++------- .../om/service/TestDirectoryDeletingService.java | 85 +++++++++++++- 8 files changed, 206 insertions(+), 83 deletions(-) diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 462b1e4331f..0a2e16a5a73 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -689,6 +689,14 @@ hdds.container.ratis.datanode.storage.dir be configured separately. </description> </property> + <property> + <name>ozone.path.deleting.limit.per.task</name> + <value>20000</value> + <tag>OZONE, PERFORMANCE, OM</tag> + <description>A maximum number of paths(dirs/files) to be deleted by + directory deleting service per time interval. + </description> + </property> <property> <name>ozone.metadata.dirs.permissions</name> <value>750</value> diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index 254a49ea9a9..3ce10ec1d14 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -388,6 +388,11 @@ public final class OMConfigKeys { public static final String OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT = "60s"; + public static final String OZONE_PATH_DELETING_LIMIT_PER_TASK = + "ozone.path.deleting.limit.per.task"; + // default is 20000 taking account of 32MB buffer size + public static final int OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT = 20000; + /** * Configuration properties for Snapshot Directory Service. */ diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java index 81e1dd3b444..eb77ac1dce3 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java @@ -41,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.LongSupplier; import org.apache.commons.lang3.RandomStringUtils; @@ -623,9 +624,9 @@ public void testAOSKeyDeletingWithSnapshotCreateParallelExecution() } return null; }).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(), - anyLong(), anyList(), anyList(), eq(null), anyLong(), anyLong(), any(), + anyLong(), anyList(), anyList(), eq(null), anyLong(), any(), any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class), anyMap(), any(), - anyLong()); + anyLong(), any(AtomicInteger.class)); Mockito.doAnswer(i -> { store.createSnapshot(testVolumeName, testBucketName, snap2); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java index 60378467d6d..2b685edf273 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/DeleteKeysResult.java @@ -27,14 +27,11 @@ public class DeleteKeysResult { private List<OmKeyInfo> keysToDelete; - private long consumedSize; private boolean processedKeys; - public DeleteKeysResult(List<OmKeyInfo> keysToDelete, - long consumedSize, boolean processedKeys) { + public DeleteKeysResult(List<OmKeyInfo> keysToDelete, boolean processedKeys) { this.keysToDelete = keysToDelete; - this.consumedSize = consumedSize; this.processedKeys = processedKeys; } @@ -42,11 +39,8 @@ public List<OmKeyInfo> getKeysToDelete() { return keysToDelete; } - public long getConsumedSize() { - return consumedSize; - } - public boolean isProcessedKeys() { return processedKeys; } + } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java index 872a99e94b1..b0562049f71 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManager.java @@ -306,9 +306,9 @@ default List<Table.KeyValue<String, OmKeyInfo>> getDeletedDirEntries(String volu * @return list of dirs * @throws IOException */ - DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, - OmKeyInfo parentInfo, CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, - long remainingBufLimit) throws IOException; + DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, OmKeyInfo parentInfo, + CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, int remainingNum) + throws IOException; /** * Returns all sub files under the given parent directory. @@ -317,10 +317,9 @@ DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, * @return list of files * @throws IOException */ - DeleteKeysResult getPendingDeletionSubFiles(long volumeId, - long bucketId, OmKeyInfo parentInfo, - CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, long remainingBufLimit) - throws IOException; + DeleteKeysResult getPendingDeletionSubFiles(long volumeId, long bucketId, OmKeyInfo parentInfo, + CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, int remainingNum) + throws IOException; /** * Returns the instance of Directory Deleting Service. diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index e458fa73236..7ad2f6d2e72 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -2264,49 +2264,37 @@ private void slimLocationVersion(OmKeyInfo... keyInfos) { } @Override - public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, - OmKeyInfo parentInfo, CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, - long remainingBufLimit) throws IOException { + public DeleteKeysResult getPendingDeletionSubDirs(long volumeId, long bucketId, OmKeyInfo parentInfo, + CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, int remainingNum) throws IOException { return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo, metadataManager.getDirectoryTable(), kv -> Table.newKeyValue(metadataManager.getOzoneDeletePathKey(kv.getValue().getObjectID(), kv.getKey()), - OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())), - filter, remainingBufLimit); + OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue())), filter, remainingNum); } - private <T extends WithParentObjectId> DeleteKeysResult gatherSubPathsWithIterator( - long volumeId, long bucketId, OmKeyInfo parentInfo, - Table<String, T> table, + private <T extends WithParentObjectId> DeleteKeysResult gatherSubPathsWithIterator(long volumeId, long bucketId, + OmKeyInfo parentInfo, Table<String, T> table, CheckedFunction<KeyValue<String, T>, KeyValue<String, OmKeyInfo>, IOException> deleteKeyTransformer, - CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> deleteKeyFilter, - long remainingBufLimit) throws IOException { + CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> deleteKeyFilter, int remainingNum) + throws IOException { List<OmKeyInfo> keyInfos = new ArrayList<>(); - String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId, - parentInfo.getObjectID(), ""); - long consumedSize = 0; + String seekFileInDB = metadataManager.getOzonePathKey(volumeId, bucketId, parentInfo.getObjectID(), ""); try (TableIterator<String, ? extends KeyValue<String, T>> iterator = table.iterator(seekFileInDB)) { - while (iterator.hasNext() && remainingBufLimit > 0) { + while (iterator.hasNext() && remainingNum > 0) { KeyValue<String, T> entry = iterator.next(); - final long objectSerializedSize = entry.getValueByteSize(); - // No need to check the table again as the value in cache and iterator would be same when directory - // deleting service runs. - if (remainingBufLimit - objectSerializedSize < 0) { - break; - } KeyValue<String, OmKeyInfo> keyInfo = deleteKeyTransformer.apply(entry); if (deleteKeyFilter.apply(keyInfo)) { keyInfos.add(keyInfo.getValue()); - remainingBufLimit -= objectSerializedSize; - consumedSize += objectSerializedSize; + remainingNum--; } } - return new DeleteKeysResult(keyInfos, consumedSize, !iterator.hasNext()); + return new DeleteKeysResult(keyInfos, !iterator.hasNext()); } } @Override public DeleteKeysResult getPendingDeletionSubFiles(long volumeId, long bucketId, OmKeyInfo parentInfo, - CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, long remainingBufLimit) + CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> filter, int remainingNum) throws IOException { CheckedFunction<KeyValue<String, OmKeyInfo>, KeyValue<String, OmKeyInfo>, IOException> tranformer = kv -> { OmKeyInfo keyInfo = OMFileRequest.getKeyInfoWithFullPath(parentInfo, kv.getValue()); @@ -2315,7 +2303,7 @@ public DeleteKeysResult getPendingDeletionSubFiles(long volumeId, return Table.newKeyValue(deleteKey, keyInfo); }; return gatherSubPathsWithIterator(volumeId, bucketId, parentInfo, metadataManager.getFileTable(), tranformer, - filter, remainingBufLimit); + filter, remainingNum); } public boolean isBucketFSOptimized(String volName, String buckName) diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java index 001e686455f..f73ee72e595 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java @@ -19,6 +19,8 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION; import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION_DEFAULT; @@ -29,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -43,6 +46,7 @@ import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -161,6 +165,7 @@ public class DirectoryDeletingService extends AbstractKeyDeletingService { private final AtomicLong deletedDirsCount; private final AtomicLong movedDirsCount; private final AtomicLong movedFilesCount; + private final int pathLimitPerTask; public DirectoryDeletingService(long interval, TimeUnit unit, long serviceTimeout, OzoneManager ozoneManager, @@ -182,6 +187,8 @@ public DirectoryDeletingService(long interval, TimeUnit unit, this.deletedDirsCount = new AtomicLong(0); this.movedDirsCount = new AtomicLong(0); this.movedFilesCount = new AtomicLong(0); + this.pathLimitPerTask = + configuration.getInt(OZONE_PATH_DELETING_LIMIT_PER_TASK, OZONE_PATH_DELETING_LIMIT_PER_TASK_DEFAULT); } public void registerReconfigCallbacks(ReconfigurationHandler handler) { @@ -262,31 +269,28 @@ void optimizeDirDeletesAndSubmitRequest( List<Pair<String, OmKeyInfo>> allSubDirList, List<PurgePathRequest> purgePathRequestList, String snapTableKey, long startTime, - long remainingBufLimit, KeyManager keyManager, + KeyManager keyManager, CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> reclaimableDirChecker, CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> reclaimableFileChecker, Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap, - UUID expectedPreviousSnapshotId, long rnCnt) throws InterruptedException { + UUID expectedPreviousSnapshotId, long rnCnt, AtomicInteger remainNum) throws InterruptedException { // Optimization to handle delete sub-dir and keys to remove quickly // This case will be useful to handle when depth of directory is high int subdirDelNum = 0; int subDirRecursiveCnt = 0; - int consumedSize = 0; - while (subDirRecursiveCnt < allSubDirList.size() && remainingBufLimit > 0) { + while (subDirRecursiveCnt < allSubDirList.size() && remainNum.get() > 0) { try { Pair<String, OmKeyInfo> stringOmKeyInfoPair = allSubDirList.get(subDirRecursiveCnt++); Boolean subDirectoryReclaimable = reclaimableDirChecker.apply(Table.newKeyValue(stringOmKeyInfoPair.getKey(), stringOmKeyInfoPair.getValue())); Optional<PurgePathRequest> request = prepareDeleteDirRequest( stringOmKeyInfoPair.getValue(), stringOmKeyInfoPair.getKey(), subDirectoryReclaimable, allSubDirList, - keyManager, reclaimableFileChecker, remainingBufLimit); + keyManager, reclaimableFileChecker, remainNum); if (!request.isPresent()) { continue; } PurgePathRequest requestVal = request.get(); - consumedSize += requestVal.getSerializedSize(); - remainingBufLimit -= consumedSize; purgePathRequestList.add(requestVal); // Count up the purgeDeletedDir, subDirs and subFiles if (requestVal.hasDeletedDir() && !StringUtils.isBlank(requestVal.getDeletedDir())) { @@ -301,7 +305,7 @@ void optimizeDirDeletesAndSubmitRequest( } } if (!purgePathRequestList.isEmpty()) { - submitPurgePaths(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap); + submitPurgePathsWithBatching(purgePathRequestList, snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap); } if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) { @@ -379,7 +383,7 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest( List<Pair<String, OmKeyInfo>> subDirList, KeyManager keyManager, CheckedFunction<Table.KeyValue<String, OmKeyInfo>, Boolean, IOException> reclaimableFileFilter, - long remainingBufLimit) throws IOException { + AtomicInteger remainNum) throws IOException { // step-0: Get one pending deleted directory if (LOG.isDebugEnabled()) { LOG.debug("Pending deleted dir name: {}", @@ -390,11 +394,12 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest( .getVolumeBucketIdPairFSO(delDirName); // step-1: get all sub directories under the deletedDir + int remainingNum = remainNum.get(); DeleteKeysResult subDirDeleteResult = keyManager.getPendingDeletionSubDirs(volumeBucketId.getVolumeId(), volumeBucketId.getBucketId(), - pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit); + pendingDeletedDirInfo, keyInfo -> true, remainingNum); List<OmKeyInfo> subDirs = subDirDeleteResult.getKeysToDelete(); - remainingBufLimit -= subDirDeleteResult.getConsumedSize(); + remainNum.addAndGet(-subDirs.size()); OMMetadataManager omMetadataManager = keyManager.getMetadataManager(); for (OmKeyInfo dirInfo : subDirs) { @@ -408,10 +413,12 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest( // step-2: get all sub files under the deletedDir // Only remove sub files if the parent directory is going to be deleted or can be reclaimed. + remainingNum = remainNum.get(); DeleteKeysResult subFileDeleteResult = keyManager.getPendingDeletionSubFiles(volumeBucketId.getVolumeId(), volumeBucketId.getBucketId(), - pendingDeletedDirInfo, keyInfo -> purgeDir || reclaimableFileFilter.apply(keyInfo), remainingBufLimit); + pendingDeletedDirInfo, keyInfo -> purgeDir || reclaimableFileFilter.apply(keyInfo), remainingNum); List<OmKeyInfo> subFiles = subFileDeleteResult.getKeysToDelete(); + remainNum.addAndGet(-subFiles.size()); if (LOG.isDebugEnabled()) { for (OmKeyInfo fileInfo : subFiles) { @@ -421,11 +428,14 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest( // step-3: If both sub-dirs and sub-files are exhausted under a parent // directory, only then delete the parent. - String purgeDeletedDir = purgeDir && subDirDeleteResult.isProcessedKeys() && - subFileDeleteResult.isProcessedKeys() ? delDirName : null; + String purgeDeletedDir = + purgeDir && subDirDeleteResult.isProcessedKeys() && subFileDeleteResult.isProcessedKeys() ? delDirName : null; if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) { return Optional.empty(); } + if (purgeDeletedDir != null) { + remainNum.addAndGet(-1); + } return Optional.of(wrapPurgeRequest(volumeBucketId.getVolumeId(), volumeBucketId.getBucketId(), purgeDeletedDir, subFiles, subDirs)); } @@ -460,9 +470,51 @@ private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest( return purgePathsRequest.build(); } - private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List<PurgePathRequest> requests, + private List<OzoneManagerProtocolProtos.OMResponse> submitPurgePathsWithBatching(List<PurgePathRequest> requests, String snapTableKey, UUID expectedPreviousSnapshotId, Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap) throws InterruptedException { + + List<OzoneManagerProtocolProtos.OMResponse> responses = new ArrayList<>(); + List<PurgePathRequest> purgePathRequestBatch = new ArrayList<>(); + long batchBytes = 0; + + for (PurgePathRequest req : requests) { + int reqSize = req.getSerializedSize(); + + // If adding this request would exceed the limit, flush the current batch first + if (batchBytes + reqSize > ratisByteLimit && !purgePathRequestBatch.isEmpty()) { + OzoneManagerProtocolProtos.OMResponse resp = + submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap, purgePathRequestBatch); + if (!resp.getSuccess()) { + return Collections.emptyList(); + } + responses.add(resp); + purgePathRequestBatch.clear(); + batchBytes = 0; + } + + // Add current request to batch + purgePathRequestBatch.add(req); + batchBytes += reqSize; + } + + // Flush remaining batch if any + if (!purgePathRequestBatch.isEmpty()) { + OzoneManagerProtocolProtos.OMResponse resp = + submitPurgeRequest(snapTableKey, expectedPreviousSnapshotId, bucketNameInfoMap, purgePathRequestBatch); + if (!resp.getSuccess()) { + return Collections.emptyList(); + } + responses.add(resp); + } + + return responses; + } + + @VisibleForTesting + OzoneManagerProtocolProtos.OMResponse submitPurgeRequest(String snapTableKey, + UUID expectedPreviousSnapshotId, Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap, + List<PurgePathRequest> pathRequests) throws InterruptedException { OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest = OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder(); @@ -476,17 +528,14 @@ private OzoneManagerProtocolProtos.OMResponse submitPurgePaths(List<PurgePathReq } purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build()); - purgeDirRequest.addAllDeletedPath(requests); - purgeDirRequest.addAllBucketNameInfos(requests.stream().map(purgePathRequest -> - new VolumeBucketId(purgePathRequest.getVolumeId(), purgePathRequest.getBucketId())).distinct() - .map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList())); + purgeDirRequest.addAllDeletedPath(pathRequests); + purgeDirRequest.addAllBucketNameInfos(pathRequests.stream() + .map(purgePathRequest -> new VolumeBucketId(purgePathRequest.getVolumeId(), purgePathRequest.getBucketId())) + .distinct().map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList())); OzoneManagerProtocolProtos.OMRequest omRequest = - OzoneManagerProtocolProtos.OMRequest.newBuilder() - .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories) - .setPurgeDirectoriesRequest(purgeDirRequest) - .setClientId(getClientId().toString()) - .build(); + OzoneManagerProtocolProtos.OMRequest.newBuilder().setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories) + .setPurgeDirectoriesRequest(purgeDirRequest).setClientId(getClientId().toString()).build(); // Submit Purge paths request to OM. Acquire bootstrap lock when processing deletes for snapshots. try (BootstrapStateHandler.Lock lock = snapTableKey != null ? getBootstrapStateLock().lock() : null) { @@ -528,8 +577,8 @@ private OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ * @param keyManager KeyManager of the underlying store. */ @VisibleForTesting - void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, - long remainingBufLimit, long rnCnt) throws IOException, ExecutionException, InterruptedException { + void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, long rnCnt, int remainNum) + throws IOException, ExecutionException, InterruptedException { String volume, bucket; String snapshotTableKey; if (currentSnapshotInfo != null) { volume = currentSnapshotInfo.getVolumeName(); @@ -553,8 +602,8 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key for (int i = 0; i < numberOfParallelThreadsPerStore; i++) { CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(() -> { try { - return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, remainingBufLimit, - expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt); + return processDeletedDirectories(currentSnapshotInfo, keyManager, dirSupplier, + expectedPreviousSnapshotId, exclusiveSizeMap, rnCnt, remainNum); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return false; @@ -597,16 +646,16 @@ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, KeyManager key * @param currentSnapshotInfo Information about the current snapshot whose deleted directories are being processed. * @param keyManager Key manager of the underlying storage system to handle key operations. * @param dirSupplier Supplier for fetching pending deleted directories to be processed. - * @param remainingBufLimit Remaining buffer limit for processing directories and files. * @param expectedPreviousSnapshotId The UUID of the previous snapshot expected in the chain. * @param totalExclusiveSizeMap A map for storing total exclusive size and exclusive replicated size * for each snapshot. * @param runCount The number of times the processing task has been executed. + * @param remaining Number of dirs to be processed. * @return A boolean indicating whether the processed directory list is empty. */ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyManager keyManager, - DeletedDirSupplier dirSupplier, long remainingBufLimit, UUID expectedPreviousSnapshotId, - Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount) throws InterruptedException { + DeletedDirSupplier dirSupplier, UUID expectedPreviousSnapshotId, + Map<UUID, Pair<Long, Long>> totalExclusiveSizeMap, long runCount, int remaining) throws InterruptedException { OmSnapshotManager omSnapshotManager = getOzoneManager().getOmSnapshotManager(); IOzoneManagerLock lock = getOzoneManager().getMetadataManager().getLock(); String snapshotTableKey = currentSnapshotInfo == null ? null : currentSnapshotInfo.getTableKey(); @@ -618,12 +667,12 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM long dirNum = 0L; long subDirNum = 0L; long subFileNum = 0L; - int consumedSize = 0; List<PurgePathRequest> purgePathRequestList = new ArrayList<>(); Map<VolumeBucketId, BucketNameInfo> bucketNameInfos = new HashMap<>(); + AtomicInteger remainNum = new AtomicInteger(remaining); List<Pair<String, OmKeyInfo>> allSubDirList = new ArrayList<>(); - while (remainingBufLimit > 0) { + while (remainNum.get() > 0) { KeyValue<String, OmKeyInfo> pendingDeletedDirInfo = dirSupplier.get(); if (pendingDeletedDirInfo == null) { break; @@ -642,13 +691,11 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM Optional<PurgePathRequest> request = prepareDeleteDirRequest( pendingDeletedDirInfo.getValue(), pendingDeletedDirInfo.getKey(), isDirReclaimable, allSubDirList, - getOzoneManager().getKeyManager(), reclaimableFileFilter, remainingBufLimit); + getOzoneManager().getKeyManager(), reclaimableFileFilter, remainNum); if (!request.isPresent()) { continue; } PurgePathRequest purgePathRequest = request.get(); - consumedSize += purgePathRequest.getSerializedSize(); - remainingBufLimit -= consumedSize; purgePathRequestList.add(purgePathRequest); // Count up the purgeDeletedDir, subDirs and subFiles if (purgePathRequest.hasDeletedDir() && !StringUtils.isBlank(purgePathRequest.getDeletedDir())) { @@ -660,9 +707,9 @@ private boolean processDeletedDirectories(SnapshotInfo currentSnapshotInfo, KeyM optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum, subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey, - startTime, remainingBufLimit, getOzoneManager().getKeyManager(), + startTime, getOzoneManager().getKeyManager(), reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos, expectedPreviousSnapshotId, - runCount); + runCount, remainNum); Map<UUID, Long> exclusiveReplicatedSizeMap = reclaimableFileFilter.getExclusiveReplicatedSizeMap(); Map<UUID, Long> exclusiveSizeMap = reclaimableFileFilter.getExclusiveSizeMap(); List<UUID> previousPathSnapshotsInChain = @@ -722,7 +769,7 @@ public BackgroundTaskResult call() { snapInfo.getName())) { KeyManager keyManager = snapInfo == null ? getOzoneManager().getKeyManager() : omSnapshot.get().getKeyManager(); - processDeletedDirsForStore(snapInfo, keyManager, ratisByteLimit, run); + processDeletedDirsForStore(snapInfo, keyManager, run, pathLimitPerTask); } } catch (IOException | ExecutionException e) { LOG.error("Error while running delete files background task for store {}. Will retry at next run.", diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java index 9fabe5a4650..06b70dca905 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java @@ -24,20 +24,26 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.CALLS_REAL_METHODS; +import static org.mockito.Mockito.mockStatic; import java.io.File; import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.hdds.client.RatisReplicationConfig; import org.apache.hadoop.hdds.client.StandaloneReplicationConfig; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.hdds.server.ServerUtils; import org.apache.hadoop.hdds.utils.db.DBConfigFromFile; import org.apache.hadoop.ozone.om.KeyManager; @@ -51,11 +57,13 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.request.OMRequestTestUtils; +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos; import org.apache.hadoop.util.Time; import org.apache.ozone.test.GenericTestUtils; import org.apache.ratis.util.ExitUtils; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.mockito.MockedStatic; @@ -185,7 +193,7 @@ public void testMultithreadedDirectoryDeletion() throws Exception { = new OmTestManagers(conf); OzoneManager ozoneManager = omTestManagers.getOzoneManager(); AtomicBoolean isRunning = new AtomicBoolean(true); - try (MockedStatic mockedStatic = Mockito.mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) { + try (MockedStatic mockedStatic = mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) { List<Pair<Supplier, CompletableFuture>> futureList = new ArrayList<>(); Thread deletionThread = new Thread(() -> { while (futureList.size() < threadCount) { @@ -221,7 +229,7 @@ public void testMultithreadedDirectoryDeletion() throws Exception { DirectoryDeletingService.DirDeletingTask dirDeletingTask = ozoneManager.getKeyManager().getDirDeletingService().new DirDeletingTask(null); - dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), Long.MAX_VALUE, 1); + dirDeletingTask.processDeletedDirsForStore(null, ozoneManager.getKeyManager(), 1, 6000); assertThat(futureList).hasSize(threadCount); for (Pair<Supplier, CompletableFuture> pair : futureList) { assertTrue(pair.getRight().isDone()); @@ -231,4 +239,77 @@ public void testMultithreadedDirectoryDeletion() throws Exception { ozoneManager.stop(); } } + + @Test + @DisplayName("DirectoryDeletingService batches PurgeDirectories by Ratis byte limit (via submitRequest spy)") + void testPurgeDirectoriesBatching() throws Exception { + final int ratisLimitBytes = 2304; + + OzoneConfiguration conf = new OzoneConfiguration(); + File testDir = Files.createTempDirectory("TestDDS-SubmitSpy").toFile(); + ServerUtils.setOzoneMetaDirPath(conf, testDir.toString()); + conf.setTimeDuration(OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL, 100, TimeUnit.MILLISECONDS); + conf.setStorageSize(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, ratisLimitBytes, StorageUnit.BYTES); + conf.setQuietMode(false); + + OmTestManagers managers = new OmTestManagers(conf); + om = managers.getOzoneManager(); + KeyManager km = managers.getKeyManager(); + + DirectoryDeletingService real = (DirectoryDeletingService) km.getDirDeletingService(); + DirectoryDeletingService dds = Mockito.spy(real); + + List<OzoneManagerProtocolProtos.OMRequest> captured = new ArrayList<>(); + Mockito.doAnswer(inv -> { + OzoneManagerProtocolProtos.OMRequest req = inv.getArgument(0); + captured.add(req); + return OzoneManagerProtocolProtos.OMResponse.newBuilder() + .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories).setStatus(OzoneManagerProtocolProtos.Status.OK) + .build(); + }).when(dds).submitRequest(Mockito.any(OzoneManagerProtocolProtos.OMRequest.class)); + + final long volumeId = 1L, bucketId = 2L; + List<OzoneManagerProtocolProtos.PurgePathRequest> purgeList = new ArrayList<>(); + + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 30; i++) { + sb.append("0123456789"); + } + final String longSuffix = sb.toString(); + + for (int i = 0; i < 20; i++) { + purgeList.add(OzoneManagerProtocolProtos.PurgePathRequest.newBuilder().setVolumeId(volumeId).setBucketId(bucketId) + .setDeletedDir("dir-" + longSuffix + "-" + i).build()); + } + + org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId vbId = + new org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId(volumeId, bucketId); + OzoneManagerProtocolProtos.BucketNameInfo bni = + OzoneManagerProtocolProtos.BucketNameInfo.newBuilder().setVolumeId(volumeId).setBucketId(bucketId) + .setVolumeName("v").setBucketName("b").build(); + Map<org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId, OzoneManagerProtocolProtos.BucketNameInfo> + bucketNameInfoMap = new HashMap<>(); + bucketNameInfoMap.put(vbId, bni); + + dds.optimizeDirDeletesAndSubmitRequest(0L, 0L, 0L, new ArrayList<>(), purgeList, null, Time.monotonicNow(), km, + kv -> true, kv -> true, bucketNameInfoMap, null, 1L, new AtomicInteger(Integer.MAX_VALUE)); + + assertThat(captured.size()) + .as("Expect batching to respect Ratis byte limit") + .isBetween(3, 5); + + for (OzoneManagerProtocolProtos.OMRequest omReq : captured) { + assertThat(omReq.getCmdType()).isEqualTo(OzoneManagerProtocolProtos.Type.PurgeDirectories); + + OzoneManagerProtocolProtos.PurgeDirectoriesRequest purge = omReq.getPurgeDirectoriesRequest(); + int payloadBytes = + purge.getDeletedPathList().stream().mapToInt(OzoneManagerProtocolProtos.PurgePathRequest::getSerializedSize) + .sum(); + + assertThat(payloadBytes).as("Batch size should respect Ratis byte limit").isLessThanOrEqualTo(ratisLimitBytes); + } + + org.apache.commons.io.FileUtils.deleteDirectory(testDir); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
