This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 23a62e48843 HDDS-13729. Acquire Bulk Bucket locks in order to prevent 
deadlock in OmDirectoryPurgeRequest (#9084)
23a62e48843 is described below

commit 23a62e488433543bbb592a014f79ca6a81f2f44e
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Sat Oct 4 05:36:02 2025 -0400

    HDDS-13729. Acquire Bulk Bucket locks in order to prevent deadlock in 
OmDirectoryPurgeRequest (#9084)
---
 .../TestDirectoryDeletingServiceWithFSO.java       |   3 +-
 .../src/main/proto/OmClientProtocol.proto          |   8 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  49 ++++++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  13 ++
 .../key/OMDirectoriesPurgeRequestWithFSO.java      |  50 ++++--
 .../ozone/om/service/DirectoryDeletingService.java |  43 +++--
 .../TestOMDirectoriesPurgeRequestAndResponse.java  | 181 ++++++++++++++++++---
 .../ozone/om/request/key/TestOMKeyRequest.java     |   5 +-
 8 files changed, 299 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
index c8ebcb083f1..81e1dd3b444 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingServiceWithFSO.java
@@ -30,6 +30,7 @@
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyMap;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.when;
 
@@ -623,7 +624,7 @@ public void 
testAOSKeyDeletingWithSnapshotCreateParallelExecution()
       return null;
     }).when(service).optimizeDirDeletesAndSubmitRequest(anyLong(), anyLong(),
         anyLong(), anyList(), anyList(), eq(null), anyLong(), anyLong(), any(),
-        any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class), 
any(),
+        any(ReclaimableDirFilter.class), any(ReclaimableKeyFilter.class), 
anyMap(), any(),
         anyLong());
 
     Mockito.doAnswer(i -> {
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index d6fc15a45a4..01f4699a88a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -1438,6 +1438,14 @@ message PurgeDirectoriesRequest {
   optional string snapshotTableKey = 2;
   // previous snapshotID can also be null & this field would be absent in 
older requests.
   optional NullableUUID expectedPreviousSnapshotID = 3;
+  repeated BucketNameInfo bucketNameInfos = 4;
+}
+
+message BucketNameInfo {
+  optional uint64 volumeId = 1;
+  optional uint64 bucketId = 2;
+  optional string volumeName = 3;
+  optional string bucketName = 4;
 }
 
 message NullableUUID {
diff --git 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index a8e5b2dab41..b52c738b568 100644
--- 
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -25,6 +25,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.utils.DBStoreHAManager;
@@ -135,6 +136,15 @@ public interface OMMetadataManager extends 
DBStoreHAManager, AutoCloseable {
    */
   String getBucketKeyPrefixFSO(String volume, String bucket) throws 
IOException;
 
+
+  /**
+   * Retrieves a pair of volume ID and bucket ID associated with the provided 
FSO (File System Object) key.
+   *
+   * @param fsoKey the key representing the File System Object, used to 
identify the corresponding volume and bucket.
+   * @return a Pair containing the volume ID as the first element and the 
bucket ID as the second element.
+   */
+  VolumeBucketId getVolumeBucketIdPairFSO(String fsoKey) throws IOException;
+
   /**
    * Given a volume, bucket and a key, return the corresponding DB key.
    *
@@ -675,4 +685,43 @@ String getMultipartKey(long volumeId, long bucketId,
    */
   boolean containsIncompleteMPUs(String volume, String bucket)
       throws IOException;
+
+  /**
+   * Represents a unique identifier for a specific bucket within a volume.
+   *
+   * This class combines a volume identifier and a bucket identifier
+   * to uniquely identify a bucket within a storage system.
+   */
+  class VolumeBucketId {
+    private final long volumeId;
+    private final long bucketId;
+
+    public VolumeBucketId(long volumeId, long bucketId) {
+      this.volumeId = volumeId;
+      this.bucketId = bucketId;
+    }
+
+    public long getBucketId() {
+      return bucketId;
+    }
+
+    public long getVolumeId() {
+      return volumeId;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+      if (!(o instanceof VolumeBucketId)) {
+        return false;
+      }
+
+      VolumeBucketId that = (VolumeBucketId) o;
+      return volumeId == that.volumeId && bucketId == that.bucketId;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(volumeId, bucketId);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 417241c9e4f..27dcb6a24e0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -544,6 +544,19 @@ public String getBucketKeyPrefixFSO(String volume, String 
bucket) throws IOExcep
     return getOzoneKeyFSO(volume, bucket, OM_KEY_PREFIX);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public VolumeBucketId getVolumeBucketIdPairFSO(String fsoKey) throws 
IOException {
+    String[] keySplit = fsoKey.split(OM_KEY_PREFIX);
+    try {
+      return new VolumeBucketId(Long.parseLong(keySplit[1]), 
Long.parseLong(keySplit[2]));
+    } catch (NumberFormatException e) {
+      throw new IOException("Invalid format for FSO Key: " + fsoKey, e);
+    }
+  }
+
   @Override
   public String getOzoneKey(String volume, String bucket, String key) {
     StringBuilder builder = new StringBuilder()
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
index 3e817b9c86c..e30a66aa124 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMDirectoriesPurgeRequestWithFSO.java
@@ -30,6 +30,8 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
@@ -42,6 +44,7 @@
 import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.execution.flowcontrol.ExecutionContext;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -54,6 +57,7 @@
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgeDirectoriesRequest;
 
 /**
  * Handles purging of keys from OM DB.
@@ -75,14 +79,13 @@ public OMDirectoriesPurgeRequestWithFSO(OMRequest 
omRequest) {
   @Override
   @SuppressWarnings("methodlength")
   public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager, 
ExecutionContext context) {
-    OzoneManagerProtocolProtos.PurgeDirectoriesRequest purgeDirsRequest =
+    PurgeDirectoriesRequest purgeDirsRequest =
         getOmRequest().getPurgeDirectoriesRequest();
     String fromSnapshot = purgeDirsRequest.hasSnapshotTableKey() ?
         purgeDirsRequest.getSnapshotTableKey() : null;
 
     List<OzoneManagerProtocolProtos.PurgePathRequest> purgeRequests =
             purgeDirsRequest.getDeletedPathList();
-    Set<Pair<String, String>> lockSet = new HashSet<>();
     Map<Pair<String, String>, OmBucketInfo> volBucketInfoMap = new HashMap<>();
     OmMetadataManagerImpl omMetadataManager = (OmMetadataManagerImpl) 
ozoneManager.getMetadataManager();
     Map<String, OmKeyInfo> openKeyInfoMap = new HashMap<>();
@@ -116,6 +119,15 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
       
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.DIRECTORY_DELETION,
 null, e));
       return new 
OMDirectoriesPurgeResponseWithFSO(createErrorOMResponse(omResponse, e));
     }
+    List<String[]> bucketLockKeys = getBucketLockKeySet(purgeDirsRequest);
+    boolean lockAcquired = 
omMetadataManager.getLock().acquireWriteLocks(BUCKET_LOCK, 
bucketLockKeys).isLockAcquired();
+    if (!lockAcquired && !purgeDirsRequest.getBucketNameInfosList().isEmpty()) 
{
+      OMException oe = new OMException("Unable to acquire write locks on 
buckets while performing DirectoryPurge",
+          OMException.ResultCodes.KEY_DELETION_ERROR);
+      LOG.error("Error occurred while performing OMDirectoriesPurge. ", oe);
+      
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.DIRECTORY_DELETION,
 null, oe));
+      return new 
OMDirectoriesPurgeResponseWithFSO(createErrorOMResponse(omResponse, oe));
+    }
     try {
       int numSubDirMoved = 0, numSubFilesMoved = 0, numDirsDeleted = 0;
       for (OzoneManagerProtocolProtos.PurgePathRequest path : purgeRequests) {
@@ -133,11 +145,7 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
           String volumeName = keyInfo.getVolumeName();
           String bucketName = keyInfo.getBucketName();
           Pair<String, String> volBucketPair = Pair.of(volumeName, bucketName);
-          if (!lockSet.contains(volBucketPair)) {
-            omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
-                volumeName, bucketName);
-            lockSet.add(volBucketPair);
-          }
+
           omMetrics.decNumKeys();
           OmBucketInfo omBucketInfo = getBucketInfo(omMetadataManager,
               volumeName, bucketName);
@@ -167,11 +175,6 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
           String volumeName = keyInfo.getVolumeName();
           String bucketName = keyInfo.getBucketName();
           Pair<String, String> volBucketPair = Pair.of(volumeName, bucketName);
-          if (!lockSet.contains(volBucketPair)) {
-            omMetadataManager.getLock().acquireWriteLock(BUCKET_LOCK,
-                volumeName, bucketName);
-            lockSet.add(volBucketPair);
-          }
 
           // If omKeyInfo has hsync metadata, delete its corresponding open 
key as well
           String dbOpenKey;
@@ -239,17 +242,34 @@ public OMClientResponse 
validateAndUpdateCache(OzoneManager ozoneManager, Execut
       
AUDIT.logWriteFailure(ozoneManager.buildAuditMessageForFailure(OMSystemAction.DIRECTORY_DELETION,
 null, ex));
       throw new IllegalStateException(ex);
     } finally {
-      lockSet.stream().forEach(e -> omMetadataManager.getLock()
-          .releaseWriteLock(BUCKET_LOCK, e.getKey(),
-              e.getValue()));
       for (Map.Entry<Pair<String, String>, OmBucketInfo> entry :
           volBucketInfoMap.entrySet()) {
         entry.setValue(entry.getValue().copyObject());
       }
+      omMetadataManager.getLock().releaseWriteLocks(BUCKET_LOCK, 
bucketLockKeys);
     }
 
     return new OMDirectoriesPurgeResponseWithFSO(
         omResponse.build(), purgeRequests,
         getBucketLayout(), volBucketInfoMap, fromSnapshotInfo, openKeyInfoMap);
   }
+
+  private List<String[]> getBucketLockKeySet(PurgeDirectoriesRequest 
purgeDirsRequest) {
+    if (!purgeDirsRequest.getBucketNameInfosList().isEmpty()) {
+      return purgeDirsRequest.getBucketNameInfosList().stream()
+          .map(keyInfo -> Pair.of(keyInfo.getVolumeName(), 
keyInfo.getBucketName()))
+          .distinct()
+          .map(pair -> new String[]{pair.getLeft(), pair.getRight()})
+          .collect(Collectors.toList());
+    }
+
+    return purgeDirsRequest.getDeletedPathList().stream()
+        .flatMap(purgePathRequest -> 
Stream.concat(purgePathRequest.getDeletedSubFilesList().stream(),
+            purgePathRequest.getMarkDeletedSubDirsList().stream()))
+        .map(keyInfo -> Pair.of(keyInfo.getVolumeName(), 
keyInfo.getBucketName()))
+        .distinct()
+        .map(pair -> new String[]{pair.getLeft(), pair.getRight()})
+        .collect(Collectors.toList());
+  }
+
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 90ad878c640..7abf1406d96 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -17,7 +17,6 @@
 
 package org.apache.hadoop.ozone.om.service;
 
-import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL_DEFAULT;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
@@ -30,9 +29,11 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -64,6 +65,7 @@
 import org.apache.hadoop.ozone.om.KeyManager;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetadataManager.VolumeBucketId;
 import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.OmSnapshot;
 import org.apache.hadoop.ozone.om.OmSnapshotManager;
@@ -76,6 +78,7 @@
 import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableDirFilter;
 import org.apache.hadoop.ozone.om.snapshot.filter.ReclaimableKeyFilter;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.BucketNameInfo;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.util.Time;
 import org.apache.ratis.util.function.CheckedFunction;
@@ -262,6 +265,7 @@ void optimizeDirDeletesAndSubmitRequest(
       long remainingBufLimit, KeyManager keyManager,
       CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
reclaimableDirChecker,
       CheckedFunction<KeyValue<String, OmKeyInfo>, Boolean, IOException> 
reclaimableFileChecker,
+      Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap,
       UUID expectedPreviousSnapshotId, long rnCnt) throws InterruptedException 
{
 
     // Optimization to handle delete sub-dir and keys to remove quickly
@@ -297,7 +301,7 @@ void optimizeDirDeletesAndSubmitRequest(
       }
     }
     if (!purgePathRequestList.isEmpty()) {
-      submitPurgePaths(purgePathRequestList, snapTableKey, 
expectedPreviousSnapshotId);
+      submitPurgePaths(purgePathRequestList, snapTableKey, 
expectedPreviousSnapshotId, bucketNameInfoMap);
     }
 
     if (dirNum != 0 || subDirNum != 0 || subFileNum != 0) {
@@ -382,21 +386,20 @@ private Optional<PurgePathRequest> 
prepareDeleteDirRequest(
           pendingDeletedDirInfo.getKeyName());
     }
 
-    final String[] keys = delDirName.split(OM_KEY_PREFIX);
-    final long volumeId = Long.parseLong(keys[1]);
-    final long bucketId = Long.parseLong(keys[2]);
+    VolumeBucketId volumeBucketId = keyManager.getMetadataManager()
+        .getVolumeBucketIdPairFSO(delDirName);
 
     // step-1: get all sub directories under the deletedDir
     DeleteKeysResult subDirDeleteResult =
-        keyManager.getPendingDeletionSubDirs(volumeId, bucketId,
+        keyManager.getPendingDeletionSubDirs(volumeBucketId.getVolumeId(), 
volumeBucketId.getBucketId(),
             pendingDeletedDirInfo, keyInfo -> true, remainingBufLimit);
     List<OmKeyInfo> subDirs = subDirDeleteResult.getKeysToDelete();
     remainingBufLimit -= subDirDeleteResult.getConsumedSize();
 
     OMMetadataManager omMetadataManager = keyManager.getMetadataManager();
     for (OmKeyInfo dirInfo : subDirs) {
-      String ozoneDbKey = omMetadataManager.getOzonePathKey(volumeId,
-          bucketId, dirInfo.getParentObjectID(), dirInfo.getFileName());
+      String ozoneDbKey = 
omMetadataManager.getOzonePathKey(volumeBucketId.getVolumeId(),
+          volumeBucketId.getBucketId(), dirInfo.getParentObjectID(), 
dirInfo.getFileName());
       String ozoneDeleteKey = omMetadataManager.getOzoneDeletePathKey(
           dirInfo.getObjectID(), ozoneDbKey);
       subDirList.add(Pair.of(ozoneDeleteKey, dirInfo));
@@ -406,7 +409,7 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest(
     // step-2: get all sub files under the deletedDir
     // Only remove sub files if the parent directory is going to be deleted or 
can be reclaimed.
     DeleteKeysResult subFileDeleteResult =
-        keyManager.getPendingDeletionSubFiles(volumeId, bucketId,
+        keyManager.getPendingDeletionSubFiles(volumeBucketId.getVolumeId(), 
volumeBucketId.getBucketId(),
             pendingDeletedDirInfo, keyInfo -> purgeDir || 
reclaimableFileFilter.apply(keyInfo), remainingBufLimit);
     List<OmKeyInfo> subFiles = subFileDeleteResult.getKeysToDelete();
 
@@ -423,7 +426,7 @@ private Optional<PurgePathRequest> prepareDeleteDirRequest(
     if (purgeDeletedDir == null && subFiles.isEmpty() && subDirs.isEmpty()) {
       return Optional.empty();
     }
-    return Optional.of(wrapPurgeRequest(volumeId, bucketId,
+    return Optional.of(wrapPurgeRequest(volumeBucketId.getVolumeId(), 
volumeBucketId.getBucketId(),
         purgeDeletedDir, subFiles, subDirs));
   }
 
@@ -458,7 +461,8 @@ private OzoneManagerProtocolProtos.PurgePathRequest 
wrapPurgeRequest(
   }
 
   private OzoneManagerProtocolProtos.OMResponse 
submitPurgePaths(List<PurgePathRequest> requests,
-      String snapTableKey, UUID expectedPreviousSnapshotId) throws 
InterruptedException {
+      String snapTableKey, UUID expectedPreviousSnapshotId, 
Map<VolumeBucketId, BucketNameInfo> bucketNameInfoMap)
+      throws InterruptedException {
     OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest 
=
         OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
 
@@ -473,6 +477,9 @@ private OzoneManagerProtocolProtos.OMResponse 
submitPurgePaths(List<PurgePathReq
     
purgeDirRequest.setExpectedPreviousSnapshotID(expectedPreviousSnapshotNullableUUID.build());
 
     purgeDirRequest.addAllDeletedPath(requests);
+    
purgeDirRequest.addAllBucketNameInfos(requests.stream().map(purgePathRequest ->
+        new VolumeBucketId(purgePathRequest.getVolumeId(), 
purgePathRequest.getBucketId())).distinct()
+        
.map(bucketNameInfoMap::get).filter(Objects::nonNull).collect(Collectors.toList()));
 
     OzoneManagerProtocolProtos.OMRequest omRequest =
         OzoneManagerProtocolProtos.OMRequest.newBuilder()
@@ -613,12 +620,24 @@ private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyM
         long subFileNum = 0L;
         int consumedSize = 0;
         List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
+        Map<VolumeBucketId, BucketNameInfo> bucketNameInfos = new HashMap<>();
+
         List<Pair<String, OmKeyInfo>> allSubDirList = new ArrayList<>();
         while (remainingBufLimit > 0) {
           KeyValue<String, OmKeyInfo> pendingDeletedDirInfo = 
dirSupplier.get();
           if (pendingDeletedDirInfo == null) {
             break;
           }
+          OmKeyInfo deletedDirInfo = pendingDeletedDirInfo.getValue();
+          VolumeBucketId volumeBucketId =
+              
keyManager.getMetadataManager().getVolumeBucketIdPairFSO(pendingDeletedDirInfo.getKey());
+          bucketNameInfos.computeIfAbsent(volumeBucketId,
+              (k) -> 
BucketNameInfo.newBuilder().setVolumeId(volumeBucketId.getVolumeId())
+              .setBucketId(volumeBucketId.getBucketId())
+              .setVolumeName(deletedDirInfo.getVolumeName())
+              .setBucketName(deletedDirInfo.getBucketName())
+              .build());
+
           boolean isDirReclaimable = 
reclaimableDirFilter.apply(pendingDeletedDirInfo);
           Optional<PurgePathRequest> request = prepareDeleteDirRequest(
               pendingDeletedDirInfo.getValue(),
@@ -642,7 +661,7 @@ private boolean processDeletedDirectories(SnapshotInfo 
currentSnapshotInfo, KeyM
         optimizeDirDeletesAndSubmitRequest(dirNum, subDirNum,
             subFileNum, allSubDirList, purgePathRequestList, snapshotTableKey,
             startTime, remainingBufLimit, getOzoneManager().getKeyManager(),
-            reclaimableDirFilter, reclaimableFileFilter, 
expectedPreviousSnapshotId,
+            reclaimableDirFilter, reclaimableFileFilter, bucketNameInfos, 
expectedPreviousSnapshotId,
             runCount);
         Map<UUID, Long> exclusiveReplicatedSizeMap = 
reclaimableFileFilter.getExclusiveReplicatedSizeMap();
         Map<UUID, Long> exclusiveSizeMap = 
reclaimableFileFilter.getExclusiveSizeMap();
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
index dff5a74173b..96f0fd63da0 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMDirectoriesPurgeRequestAndResponse.java
@@ -18,20 +18,33 @@
 package org.apache.hadoop.ozone.om.request.key;
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static 
org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.BUCKET_LOCK;
 import static 
org.apache.hadoop.ozone.om.request.file.OMFileRequest.getOmKeyInfo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import jakarta.annotation.Nonnull;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.Random;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -49,12 +62,16 @@
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
 import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
 import 
org.apache.hadoop.ozone.om.response.key.OMDirectoriesPurgeResponseWithFSO;
 import org.apache.hadoop.ozone.om.response.key.OMKeyPurgeResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.PurgePathRequest;
 import org.apache.hadoop.util.Time;
+import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -127,45 +144,48 @@ private OMRequest createPurgeKeysRequest(String 
fromSnapshot, String purgeDelete
     return createPurgeKeysRequest(fromSnapshot, purgeDeletedDir, 
Collections.emptyList(), keyList, bucketInfo);
   }
 
+  private OMRequest createPurgeKeysRequest(String fromSnapshot,
+      List<PurgePathRequest> purgePathRequestList) {
+    OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest 
=
+        OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
+    purgeDirRequest.addAllDeletedPath(purgePathRequestList);
+    if (fromSnapshot != null) {
+      purgeDirRequest.setSnapshotTableKey(fromSnapshot);
+    }
+    OzoneManagerProtocolProtos.OMRequest omRequest =
+        OzoneManagerProtocolProtos.OMRequest.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
+            .setPurgeDirectoriesRequest(purgeDirRequest)
+            .setClientId(UUID.randomUUID().toString())
+            .build();
+    return omRequest;
+  }
+
   /**
    * Create OMRequest which encapsulates DeleteKeyRequest.
    * @return OMRequest
    */
   private OMRequest createPurgeKeysRequest(String fromSnapshot, String 
purgeDeletedDir,
       List<OmKeyInfo> subDirs, List<OmKeyInfo> keyList, OmBucketInfo 
bucketInfo) throws IOException {
-    List<OzoneManagerProtocolProtos.PurgePathRequest> purgePathRequestList
-        = new ArrayList<>();
+    List<PurgePathRequest> purgePathRequestList = new ArrayList<>();
     List<OmKeyInfo> subFiles = new ArrayList<>();
     for (OmKeyInfo key : keyList) {
       subFiles.add(key);
     }
     Long volumeId = omMetadataManager.getVolumeId(bucketInfo.getVolumeName());
     Long bucketId = bucketInfo.getObjectID();
-    OzoneManagerProtocolProtos.PurgePathRequest request = wrapPurgeRequest(
+    PurgePathRequest request = wrapPurgeRequest(
         volumeId, bucketId, purgeDeletedDir, subFiles, subDirs);
     purgePathRequestList.add(request);
-    
-    OzoneManagerProtocolProtos.PurgeDirectoriesRequest.Builder purgeDirRequest 
=
-        OzoneManagerProtocolProtos.PurgeDirectoriesRequest.newBuilder();
-    purgeDirRequest.addAllDeletedPath(purgePathRequestList);
-    if (fromSnapshot != null) {
-      purgeDirRequest.setSnapshotTableKey(fromSnapshot);
-    }
-    OzoneManagerProtocolProtos.OMRequest omRequest =
-        OzoneManagerProtocolProtos.OMRequest.newBuilder()
-            .setCmdType(OzoneManagerProtocolProtos.Type.PurgeDirectories)
-            .setPurgeDirectoriesRequest(purgeDirRequest)
-            .setClientId(UUID.randomUUID().toString())
-            .build();
-    return omRequest;
+    return createPurgeKeysRequest(fromSnapshot, purgePathRequestList);
   }
 
-  private OzoneManagerProtocolProtos.PurgePathRequest wrapPurgeRequest(
+  private PurgePathRequest wrapPurgeRequest(
       final long volumeId, final long bucketId, final String purgeDeletedDir,
       final List<OmKeyInfo> purgeDeletedFiles, final List<OmKeyInfo> 
markDirsAsDeleted) {
     // Put all keys to be purged in a list
-    OzoneManagerProtocolProtos.PurgePathRequest.Builder purgePathsRequest
-        = OzoneManagerProtocolProtos.PurgePathRequest.newBuilder();
+    PurgePathRequest.Builder purgePathsRequest
+        = PurgePathRequest.newBuilder();
     purgePathsRequest.setVolumeId(volumeId);
     purgePathsRequest.setBucketId(bucketId);
 
@@ -200,13 +220,128 @@ private OMRequest preExecute(OMRequest 
originalOmRequest) throws IOException {
     return modifiedOmRequest;
   }
 
+  private PurgePathRequest createBucketDataAndGetPurgePathRequest(OmBucketInfo 
bucketInfo) throws Exception {
+    OmDirectoryInfo dir1 = new OmDirectoryInfo.Builder()
+        .setName("dir1")
+        .setCreationTime(Time.now())
+        .setModificationTime(Time.now())
+        .setObjectID(1)
+        .setParentObjectID(bucketInfo.getObjectID())
+        .setUpdateID(0)
+        .build();
+    String dirKey = OMRequestTestUtils.addDirKeyToDirTable(false, dir1, 
volumeName,
+        bucketInfo.getBucketName(), 1L, omMetadataManager);
+    List<OmKeyInfo> subFiles = new ArrayList<>();
+    List<OmKeyInfo> subDirs = new ArrayList<>();
+    List<String> subFileKeys = new ArrayList<>();
+    List<String> subDirKeys = new ArrayList<>();
+    for (int id = 1; id < 10; id++) {
+      OmDirectoryInfo subdir = new OmDirectoryInfo.Builder()
+          .setName("subdir" + id)
+          .setCreationTime(Time.now())
+          .setModificationTime(Time.now())
+          .setObjectID(2 * id)
+          .setParentObjectID(dir1.getObjectID())
+          .setUpdateID(0)
+          .build();
+      String subDirectoryPath = OMRequestTestUtils.addDirKeyToDirTable(false, 
subdir, volumeName,
+          bucketInfo.getBucketName(), 2 * id, omMetadataManager);
+      subDirKeys.add(subDirectoryPath);
+      OmKeyInfo subFile =
+          OMRequestTestUtils.createOmKeyInfo(volumeName, 
bucketInfo.getBucketName(), "file" + id,
+                  RatisReplicationConfig.getInstance(ONE))
+              .setObjectID(2 * id + 1)
+              .setParentObjectID(dir1.getObjectID())
+              .setUpdateID(100L)
+              .build();
+      String subFilePath = OMRequestTestUtils.addFileToKeyTable(false, true, 
subFile.getKeyName(),
+          subFile, 1234L, 2 * id + 1, omMetadataManager);
+      subFileKeys.add(subFilePath);
+      subFile.setKeyName("dir1/" + subFile.getKeyName());
+      subFiles.add(subFile);
+      subDirs.add(getOmKeyInfo(volumeName, bucketInfo.getBucketName(), subdir,
+          "dir1/" + subdir.getName()));
+    }
+    String deletedDirKey = OMRequestTestUtils.deleteDir(dirKey, volumeName, 
bucketInfo.getBucketName(),
+        omMetadataManager);
+    for (String subDirKey : subDirKeys) {
+      assertTrue(omMetadataManager.getDirectoryTable().isExist(subDirKey));
+    }
+    for (String subFileKey : subFileKeys) {
+      assertTrue(omMetadataManager.getFileTable().isExist(subFileKey));
+    }
+    assertFalse(omMetadataManager.getDirectoryTable().isExist(dirKey));
+    Long volumeId = omMetadataManager.getVolumeId(bucketInfo.getVolumeName());
+    long bucketId = bucketInfo.getObjectID();
+    return wrapPurgeRequest(volumeId, bucketId, deletedDirKey, subFiles, 
subDirs);
+  }
+
+  @Test
+  public void testBucketLockWithPurgeDirectory() throws Exception {
+    when(ozoneManager.getDefaultReplicationConfig())
+        
.thenReturn(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+    String bucket1 = "bucket" + RandomUtils.secure().randomInt();
+    // Add volume, bucket and key entries to OM DB.
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket1,
+        omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    String bucketKey1 = omMetadataManager.getBucketKey(volumeName, bucket1);
+    OmBucketInfo bucketInfo1 = 
omMetadataManager.getBucketTable().get(bucketKey1);
+    PurgePathRequest purgePathRequest1 = 
createBucketDataAndGetPurgePathRequest(bucketInfo1);
+    String bucket2 = "bucket" + RandomUtils.secure().randomInt();
+    // Add volume, bucket and key entries to OM DB.
+    OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket2,
+        omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
+    String bucketKey2 = omMetadataManager.getBucketKey(volumeName, bucket1);
+    OmBucketInfo bucketInfo2 = 
omMetadataManager.getBucketTable().get(bucketKey2);
+    PurgePathRequest purgePathRequest2 = 
createBucketDataAndGetPurgePathRequest(bucketInfo2);
+    IOzoneManagerLock lock = spy(omMetadataManager.getLock());
+    Set<Long> acquiredLockIds = new ConcurrentSkipListSet<>();
+    Set<String> acquiredLockKeys = new ConcurrentSkipListSet<>();
+    try {
+      doAnswer(i -> {
+        long threadId = Thread.currentThread().getId();
+        GenericTestUtils.waitFor(() -> !acquiredLockIds.contains(threadId) || 
acquiredLockIds.size() == 2, 1000, 30000);
+        OMLockDetails lockDetails = (OMLockDetails) i.callRealMethod();
+        acquiredLockIds.add(threadId);
+        acquiredLockKeys.add(i.getArgument(1) + "/" + i.getArgument(2));
+        return lockDetails;
+      }).when(lock).acquireWriteLock(eq(BUCKET_LOCK), anyString(), 
anyString());
+
+      doAnswer(i -> {
+        long threadId = Thread.currentThread().getId();
+        GenericTestUtils.waitFor(() -> !acquiredLockIds.contains(threadId) || 
acquiredLockIds.size() == 2, 1000, 30000);
+        OMLockDetails lockDetails = (OMLockDetails) i.callRealMethod();
+        acquiredLockIds.add(threadId);
+        for (String[] lockKey : (List<String[]>) i.getArgument(1)) {
+          acquiredLockKeys.add(lockKey[0] + "/" + lockKey[1]);
+        }
+        return lockDetails;
+      }).when(lock).acquireWriteLocks(eq(BUCKET_LOCK), anyCollection());
+      when(omMetadataManager.getLock()).thenReturn(lock);
+      OMDirectoriesPurgeRequestWithFSO purgePathRequests1 = new 
OMDirectoriesPurgeRequestWithFSO(
+          preExecute(createPurgeKeysRequest(null, 
Arrays.asList(purgePathRequest1, purgePathRequest2))));
+      OMDirectoriesPurgeRequestWithFSO purgePathRequests2 = new 
OMDirectoriesPurgeRequestWithFSO(
+          preExecute(createPurgeKeysRequest(null, 
Arrays.asList(purgePathRequest2, purgePathRequest1))));
+      CompletableFuture future1 = CompletableFuture.runAsync(() ->
+          purgePathRequests1.validateAndUpdateCache(ozoneManager, 100L));
+      CompletableFuture future2 = CompletableFuture.runAsync(() ->
+          purgePathRequests2.validateAndUpdateCache(ozoneManager, 100L));
+      future1.get();
+      future2.get();
+      assertEquals(Stream.of(bucketInfo1.getVolumeName() + "/" + 
bucketInfo1.getBucketName(),
+              bucketInfo2.getVolumeName() + "/" + 
bucketInfo2.getBucketName()).collect(Collectors.toSet()),
+          acquiredLockKeys);
+    } finally {
+      reset(lock);
+    }
+  }
+
   @ParameterizedTest
   @CsvSource(value = {"false,false", "false,true", "true,false", "true,true"})
   public void testDirectoryPurge(boolean fromSnapshot, boolean purgeDirectory) 
throws Exception {
     when(ozoneManager.getDefaultReplicationConfig())
         
.thenReturn(RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
-    Random random = new Random();
-    String bucket = "bucket" + random.nextInt();
+    String bucket = "bucket" + RandomUtils.secure().randomInt();
     // Add volume, bucket and key entries to OM DB.
     OMRequestTestUtils.addVolumeAndBucketToDB(volumeName, bucket,
         omMetadataManager, BucketLayout.FILE_SYSTEM_OPTIMIZED);
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 1d29e37d80e..f0e32ac405b 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -27,6 +27,7 @@
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.framework;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
 import jakarta.annotation.Nonnull;
@@ -145,8 +146,8 @@ public void setup() throws Exception {
         folder.toAbsolutePath().toString());
     
ozoneConfiguration.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, 
true);
     ozoneConfiguration.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, 
true);
-    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration,
-        ozoneManager);
+    omMetadataManager = spy(new OmMetadataManagerImpl(ozoneConfiguration,
+        ozoneManager));
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getPerfMetrics()).thenReturn(perfMetrics);
     when(ozoneManager.getDeletionMetrics()).thenReturn(delMetrics);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to