This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 3c2e36513b HDDS-13494. DirectoryDeletingService incorrectly waits for
all the deleted directories processing (#8852)
3c2e36513b is described below
commit 3c2e36513b88c0e73ecccca0f5dcf5f9c39d10f6
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Fri Jul 25 14:41:09 2025 -0400
HDDS-13494. DirectoryDeletingService incorrectly waits for all the deleted
directories processing (#8852)
---
.../ozone/om/service/DirectoryDeletingService.java | 9 +--
.../om/service/TestDirectoryDeletingService.java | 80 +++++++++++++++++++++-
2 files changed, 82 insertions(+), 7 deletions(-)
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
index 4ee29a5aee..24c5e1f096 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/service/DirectoryDeletingService.java
@@ -320,7 +320,7 @@ void optimizeDirDeletesAndSubmitRequest(
}
private static final class DeletedDirSupplier implements Closeable {
- private TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
+ private final TableIterator<String, ? extends KeyValue<String, OmKeyInfo>>
deleteTableIterator;
private DeletedDirSupplier(TableIterator<String, ? extends
KeyValue<String, OmKeyInfo>> deleteTableIterator) {
@@ -335,7 +335,7 @@ private synchronized Table.KeyValue<String, OmKeyInfo>
get() {
}
@Override
- public void close() {
+ public synchronized void close() {
IOUtils.closeQuietly(deleteTableIterator);
}
}
@@ -521,7 +521,8 @@ private
OzoneManagerProtocolProtos.SetSnapshotPropertyRequest getSetSnapshotRequ
* @param currentSnapshotInfo if null, deleted directories in AOS should
be processed.
* @param keyManager KeyManager of the underlying store.
*/
- private void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo,
KeyManager keyManager,
+ @VisibleForTesting
+ void processDeletedDirsForStore(SnapshotInfo currentSnapshotInfo,
KeyManager keyManager,
long remainingBufLimit, long rnCnt) throws IOException,
ExecutionException, InterruptedException {
String volume, bucket; String snapshotTableKey;
if (currentSnapshotInfo != null) {
@@ -555,7 +556,7 @@ private void processDeletedDirsForStore(SnapshotInfo
currentSnapshotInfo, KeyMan
return false;
}
}, isThreadPoolActive(deletionThreadPool) ? deletionThreadPool :
ForkJoinPool.commonPool());
- processedAllDeletedDirs = future.thenCombine(future, (a, b) -> a &&
b);
+ processedAllDeletedDirs =
processedAllDeletedDirs.thenCombine(future, (a, b) -> a && b);
}
// If AOS or all directories have been processed for snapshot, update
snapshot size delta and deep clean flag
// if it is a snapshot.
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
index e7c56d07c0..1a55b63bb2 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/service/TestDirectoryDeletingService.java
@@ -19,13 +19,22 @@
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_DIR_DELETING_SERVICE_INTERVAL;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_THREAD_NUMBER_DIR_DELETION;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.CALLS_REAL_METHODS;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.RatisReplicationConfig;
import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -49,11 +58,18 @@
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test Directory Deleting Service.
*/
public class TestDirectoryDeletingService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TestDirectoryDeletingService.class);
+
@TempDir
private Path folder;
private OzoneManagerProtocol writeClient;
@@ -66,7 +82,7 @@ public static void setup() {
ExitUtils.disableSystemExit();
}
- private OzoneConfiguration createConfAndInitValues() throws IOException {
+ private OzoneConfiguration createConfAndInitValues(int threadCount) throws
IOException {
OzoneConfiguration conf = new OzoneConfiguration();
File newFolder = folder.toFile();
if (!newFolder.exists()) {
@@ -76,6 +92,7 @@ private OzoneConfiguration createConfAndInitValues() throws
IOException {
ServerUtils.setOzoneMetaDirPath(conf, newFolder.toString());
conf.setTimeDuration(OZONE_DIR_DELETING_SERVICE_INTERVAL, 3000,
TimeUnit.MILLISECONDS);
+ conf.setInt(OZONE_THREAD_NUMBER_DIR_DELETION, threadCount);
conf.set(OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, "4MB");
conf.setQuietMode(false);
@@ -87,12 +104,14 @@ private OzoneConfiguration createConfAndInitValues()
throws IOException {
@AfterEach
public void cleanup() throws Exception {
- om.stop();
+ if (om != null) {
+ om.stop();
+ }
}
@Test
public void testDeleteDirectoryCrossingSizeLimit() throws Exception {
- OzoneConfiguration conf = createConfAndInitValues();
+ OzoneConfiguration conf = createConfAndInitValues(10);
OmTestManagers omTestManagers
= new OmTestManagers(conf);
KeyManager keyManager = omTestManagers.getKeyManager();
@@ -158,4 +177,59 @@ public void testDeleteDirectoryCrossingSizeLimit() throws
Exception {
500, 60000);
assertThat(dirDeletingService.getRunCount().get()).isGreaterThanOrEqualTo(1);
}
+
+ @Test
+ public void testMultithreadedDirectoryDeletion() throws Exception {
+ int threadCount = 10;
+ OzoneConfiguration conf = createConfAndInitValues(threadCount);
+ OmTestManagers omTestManagers
+ = new OmTestManagers(conf);
+ OzoneManager ozoneManager = omTestManagers.getOzoneManager();
+ AtomicBoolean isRunning = new AtomicBoolean(true);
+ try (MockedStatic mockedStatic =
Mockito.mockStatic(CompletableFuture.class, CALLS_REAL_METHODS)) {
+ List<Pair<Supplier, CompletableFuture>> futureList = new ArrayList<>();
+ Thread deletionThread = new Thread(() -> {
+ while (futureList.size() < threadCount) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOG.error("Error while sleeping", e);
+ }
+ }
+ for (int i = futureList.size() - 1; i >= 0; i--) {
+ Pair<Supplier, CompletableFuture> pair = futureList.get(i);
+ pair.getLeft().get();
+ assertTrue(isRunning.get());
+ pair.getRight().complete(false);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOG.error("Error while sleeping", e);
+ }
+ }
+ });
+ deletionThread.start();
+
+ mockedStatic
+ .when(() -> CompletableFuture.supplyAsync(any(), any()))
+ .thenAnswer(invocation -> {
+ Supplier<Boolean> supplier = invocation.getArgument(0);
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ futureList.add(Pair.of(supplier, future));
+ return future;
+ });
+ ozoneManager.getKeyManager().getDirDeletingService().suspend();
+ DirectoryDeletingService.DirDeletingTask dirDeletingTask =
+ ozoneManager.getKeyManager().getDirDeletingService().new
DirDeletingTask(null);
+
+ dirDeletingTask.processDeletedDirsForStore(null,
ozoneManager.getKeyManager(), Long.MAX_VALUE, 1);
+ assertThat(futureList).hasSize(threadCount);
+ for (Pair<Supplier, CompletableFuture> pair : futureList) {
+ assertTrue(pair.getRight().isDone());
+ }
+ isRunning.set(false);
+ } finally {
+ ozoneManager.stop();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]