This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c2e36513b HDDS-13494. DirectoryDeletingService incorrectly waits for 
all the deleted directories processing (#8852)
3c2e36513b is described below

commit 3c2e36513b88c0e73ecccca0f5dcf5f9c39d10f6
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jul 25 14:41:09 2025 -0400

    HDDS-13494. DirectoryDeletingService incorrectly waits for all the deleted 
directories processing (#8852)
---
 .../ozone/om/service/DirectoryDeletingService.java |  9 +--
 .../om/service/TestDirectoryDeletingService.java   | 80 +++++++++++++++++++++-
 2 files changed, 82 insertions(+), 7 deletions(-)

diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 4ee29a5aee..24c5e1f096 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -320,7 +320,7 @@ void optimizeDirDeletesAndSubmitRequest(
   }
 
   private static final class DeletedDirSupplier implements Closeable {
-    private TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
+    private final TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
         deleteTableIterator;
 
     private DeletedDirSupplier(TableIterator<String, ? extends 
KeyValue<String, OmKeyInfo>> deleteTableIterator) {
@@ -335,7 +335,7 @@ private synchronized Table.KeyValue<String, OmKeyInfo> 
get() {
     }
 
     @Override
-    public void close() {
+    public synchronized void close() {
       IOUtils.closeQuietly(deleteTableIterator);
     }
   }
@@ -521,7 +521,8 @@ private 
OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ
      * @param currentSnapshotInfo if null, deleted directories in AOS should 
be processed.
      * @param keyManager KeyManager of the underlying store.
      */
-    private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, 
KeyManager keyManager,
+    @VisibleForTesting
+    void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo, 
KeyManager keyManager,
         long remainingBufLimit, long rnCnt) throws IOException, 
ExecutionException, InterruptedException {
       String volume, bucket; String snapshotTableKey;
       if (currentSnapshotInfo != null) {
@@ -555,7 +556,7 @@ private void processDeletedDirsForStore(SnapshotInfo 
currentSnapshotInfo, KeyMan
               return false;
             }
           }, isThreadPoolActive(deletionThreadPool) ? deletionThreadPool : 
ForkJoinPool.commonPool());
-          processedAllDeletedDirs = future.thenCombine(future, (a, b) -> a && 
b);
+          processedAllDeletedDirs = 
processedAllDeletedDirs.thenCombine(future, (a, b) -> a && b);
         }
         // If AOS or all directories have been processed for snapshot, update 
snapshot size delta and deep clean flag
         // if it is a snapshot.
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index e7c56d07c0..1a55b63bb2 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -19,13 +19,22 @@
 
 import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
+import static 
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -49,11 +58,18 @@
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Test Directory Deleting Service.
  */
 public class TestDirectoryDeletingService {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TestDirectoryDeletingService.class);
+
   @TempDir
   private Path folder;
   private OzoneManagerProtocol writeClient;
@@ -66,7 +82,7 @@ public static void setup() {
     ExitUtils.disableSystemExit();
   }
 
-  private OzoneConfiguration createConfAndInitValues() throws IOException {
+  private OzoneConfiguration createConfAndInitValues(int threadCount) throws 
IOException {
     OzoneConfiguration conf = new OzoneConfiguration();
     File newFolder = folder.toFile();
     if (!newFolder.exists()) {
@@ -76,6 +92,7 @@ private OzoneConfiguration createConfAndInitValues() throws 
IOException {
     ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
     conf.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, 3000,
         TimeUnit.MILLISECONDS);
+    conf.setInt(OZONE_THREAD_NUMBER_DIR_DELETION, threadCount);
     conf.set(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, "4MB");
     conf.setQuietMode(false);
 
@@ -87,12 +104,14 @@ private OzoneConfiguration createConfAndInitValues() 
throws IOException {
 
   @AfterEach
   public void cleanup() throws Exception {
-    om.stop();
+    if (om != null) {
+      om.stop();
+    }
   }
 
   @Test
   public void testDeleteDirectoryCrossingSizeLimit() throws Exception {
-    OzoneConfiguration conf = createConfAndInitValues();
+    OzoneConfiguration conf = createConfAndInitValues(10);
     OmTestManagers omTestManagers
         = new OmTestManagers(conf);
     KeyManager keyManager = omTestManagers.getKeyManager();
@@ -158,4 +177,59 @@ public void testDeleteDirectoryCrossingSizeLimit() throws 
Exception {
         500, 60000);
     
assertThat(dirDeletingService.getRunCount().get()).isGreaterThanOrEqualTo(1);
   }
+
+  @Test
+  public void testMultithreadedDirectoryDeletion() throws Exception {
+    int threadCount = 10;
+    OzoneConfiguration conf = createConfAndInitValues(threadCount);
+    OmTestManagers omTestManagers
+        = new OmTestManagers(conf);
+    OzoneManager ozoneManager = omTestManagers.getOzoneManager();
+    AtomicBoolean isRunning = new AtomicBoolean(true);
+    try (MockedStatic mockedStatic = 
Mockito.mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
+      List<Pair<Supplier, CompletableFuture>> futureList = new ArrayList<>();
+      Thread deletionThread = new Thread(() -> {
+        while (futureList.size() < threadCount) {
+          try {
+            Thread.sleep(100);
+          } catch (InterruptedException e) {
+            LOG.error("Error while sleeping", e);
+          }
+        }
+        for (int i = futureList.size() - 1; i >= 0; i--) {
+          Pair<Supplier, CompletableFuture> pair = futureList.get(i);
+          pair.getLeft().get();
+          assertTrue(isRunning.get());
+          pair.getRight().complete(false);
+          try {
+            Thread.sleep(500);
+          } catch (InterruptedException e) {
+            LOG.error("Error while sleeping", e);
+          }
+        }
+      });
+      deletionThread.start();
+
+      mockedStatic
+          .when(() -> CompletableFuture.supplyAsync(any(), any()))
+          .thenAnswer(invocation -> {
+            Supplier<Boolean> supplier = invocation.getArgument(0);
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            futureList.add(Pair.of(supplier, future));
+            return future;
+          });
+      ozoneManager.getKeyManager().getDirDeletingService().suspend();
+      DirectoryDeletingService.DirDeletingTask dirDeletingTask =
+          ozoneManager.getKeyManager().getDirDeletingService().new 
DirDeletingTask(null);
+
+      dirDeletingTask.processDeletedDirsForStore(null, 
ozoneManager.getKeyManager(), Long.MAX_VALUE, 1);
+      assertThat(futureList).hasSize(threadCount);
+      for (Pair<Supplier, CompletableFuture> pair : futureList) {
+        assertTrue(pair.getRight().isDone());
+      }
+      isRunning.set(false);
+    } finally {
+      ozoneManager.stop();
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to