This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 4e5e5a0e0ad HDDS-13779. Correctly initialize the bootstrap lock in
OMDBCheckpointServletInodeBasedXfer. (#9142)
4e5e5a0e0ad is described below
commit 4e5e5a0e0ad347b6e1df302cd24d1fab4bfc4761
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Tue Oct 14 23:43:31 2025 +0530
HDDS-13779. Correctly initialize the bootstrap lock in
OMDBCheckpointServletInodeBasedXfer. (#9142)
---
.../TestOMDbCheckpointServletInodeBasedXfer.java | 148 +++++++++++++++++++++
.../om/OMDBCheckpointServletInodeBasedXfer.java | 7 +
2 files changed, 155 insertions(+)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
index ec2080e9cf4..0f5c8bae4b4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
@@ -62,12 +62,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import javax.servlet.ServletConfig;
import javax.servlet.ServletContext;
import javax.servlet.ServletOutputStream;
import javax.servlet.WriteListener;
@@ -96,8 +99,12 @@
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
+import org.apache.hadoop.ozone.om.service.KeyDeletingService;
+import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -110,6 +117,8 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Class used for testing the OM DB Checkpoint provider servlet using inode
based transfer logic.
@@ -128,6 +137,8 @@ public class TestOMDbCheckpointServletInodeBasedXfer {
private ServletOutputStream servletOutputStream;
private File tempFile;
private static final AtomicInteger COUNTER = new AtomicInteger();
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestOMDbCheckpointServletInodeBasedXfer.class);
@BeforeEach
void init() throws Exception {
@@ -438,6 +449,143 @@ public void testWriteDBToArchive(boolean
expectOnlySstFiles) throws Exception {
}
}
+ @Test
+ public void testBootstrapLockCoordination() throws Exception {
+ // Create mocks for all background services
+ KeyDeletingService mockDeletingService = mock(KeyDeletingService.class);
+ DirectoryDeletingService mockDirDeletingService =
mock(DirectoryDeletingService.class);
+ SstFilteringService mockFilteringService = mock(SstFilteringService.class);
+ SnapshotDeletingService mockSnapshotDeletingService =
mock(SnapshotDeletingService.class);
+ RocksDBCheckpointDiffer mockCheckpointDiffer =
mock(RocksDBCheckpointDiffer.class);
+ // Create mock locks for each service
+ BootstrapStateHandler.Lock mockDeletingLock =
mock(BootstrapStateHandler.Lock.class);
+ BootstrapStateHandler.Lock mockDirDeletingLock =
mock(BootstrapStateHandler.Lock.class);
+ BootstrapStateHandler.Lock mockFilteringLock =
mock(BootstrapStateHandler.Lock.class);
+ BootstrapStateHandler.Lock mockSnapshotDeletingLock =
mock(BootstrapStateHandler.Lock.class);
+ BootstrapStateHandler.Lock mockCheckpointDifferLock =
mock(BootstrapStateHandler.Lock.class);
+ // Configure service mocks to return their respective locks
+
when(mockDeletingService.getBootstrapStateLock()).thenReturn(mockDeletingLock);
+
when(mockDirDeletingService.getBootstrapStateLock()).thenReturn(mockDirDeletingLock);
+
when(mockFilteringService.getBootstrapStateLock()).thenReturn(mockFilteringLock);
+
when(mockSnapshotDeletingService.getBootstrapStateLock()).thenReturn(mockSnapshotDeletingLock);
+
when(mockCheckpointDiffer.getBootstrapStateLock()).thenReturn(mockCheckpointDifferLock);
+ // Mock KeyManager and its services
+ KeyManager mockKeyManager = mock(KeyManager.class);
+ when(mockKeyManager.getDeletingService()).thenReturn(mockDeletingService);
+
when(mockKeyManager.getDirDeletingService()).thenReturn(mockDirDeletingService);
+
when(mockKeyManager.getSnapshotSstFilteringService()).thenReturn(mockFilteringService);
+
when(mockKeyManager.getSnapshotDeletingService()).thenReturn(mockSnapshotDeletingService);
+ // Mock OMMetadataManager and Store
+ OMMetadataManager mockMetadataManager = mock(OMMetadataManager.class);
+ DBStore mockStore = mock(DBStore.class);
+ when(mockMetadataManager.getStore()).thenReturn(mockStore);
+
when(mockStore.getRocksDBCheckpointDiffer()).thenReturn(mockCheckpointDiffer);
+ // Mock OzoneManager
+ OzoneManager mockOM = mock(OzoneManager.class);
+ when(mockOM.getKeyManager()).thenReturn(mockKeyManager);
+ when(mockOM.getMetadataManager()).thenReturn(mockMetadataManager);
+ // Create the actual Lock instance (this tests the real implementation)
+ OMDBCheckpointServlet.Lock bootstrapLock = new
OMDBCheckpointServlet.Lock(mockOM);
+ // Test successful lock acquisition
+ BootstrapStateHandler.Lock result = bootstrapLock.lock();
+ // Verify all service locks were acquired
+ verify(mockDeletingLock).lock();
+ verify(mockDirDeletingLock).lock();
+ verify(mockFilteringLock).lock();
+ verify(mockSnapshotDeletingLock).lock();
+ verify(mockCheckpointDifferLock).lock();
+ // Verify double buffer flush was called
+ verify(mockOM).awaitDoubleBufferFlush();
+ // Verify the lock returns itself
+ assertEquals(bootstrapLock, result);
+ // Test unlock
+ bootstrapLock.unlock();
+ // Verify all service locks were released
+ verify(mockDeletingLock).unlock();
+ verify(mockDirDeletingLock).unlock();
+ verify(mockFilteringLock).unlock();
+ verify(mockSnapshotDeletingLock).unlock();
+ verify(mockCheckpointDifferLock).unlock();
+ }
+
+ /**
+ * Verifies that bootstrap lock acquisition blocks background services
during checkpoint creation,
+ * preventing race conditions between checkpoint and service operations.
+ */
+ @Test
+ public void testBootstrapLockBlocksMultipleServices() throws Exception {
+ setupCluster();
+ // Initialize servlet
+ OMDBCheckpointServletInodeBasedXfer servlet = new
OMDBCheckpointServletInodeBasedXfer();
+ ServletConfig servletConfig = mock(ServletConfig.class);
+ ServletContext servletContext = mock(ServletContext.class);
+ when(servletConfig.getServletContext()).thenReturn(servletContext);
+
when(servletContext.getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE)).thenReturn(om);
+ servlet.init(servletConfig);
+
+ BootstrapStateHandler.Lock bootstrapLock = servlet.getBootstrapStateLock();
+ // Test multiple services being blocked
+ CountDownLatch bootstrapAcquired = new CountDownLatch(1);
+ CountDownLatch allServicesCompleted = new CountDownLatch(3); // 3
background services
+ AtomicInteger servicesBlocked = new AtomicInteger(0);
+ AtomicInteger servicesSucceeded = new AtomicInteger(0);
+ // Checkpoint thread holds bootstrap lock
+ Thread checkpointThread = new Thread(() -> {
+ try {
+ LOG.info("Acquiring bootstrap lock for checkpoint...");
+ BootstrapStateHandler.Lock acquired = bootstrapLock.lock();
+ bootstrapAcquired.countDown();
+ Thread.sleep(3000); // Hold for 3 seconds
+ LOG.info("Releasing bootstrap lock...");
+ acquired.unlock();
+ } catch (Exception e) {
+ fail("Checkpoint failed: " + e.getMessage());
+ }
+ });
+
+ BiFunction<String, BootstrapStateHandler, Thread> createServiceThread =
+ (serviceName, service) -> new Thread(() -> {
+ try {
+ bootstrapAcquired.await();
+ if (service != null) {
+ LOG.info("{} : Trying to acquire lock...", serviceName);
+ servicesBlocked.incrementAndGet();
+ BootstrapStateHandler.Lock serviceLock =
service.getBootstrapStateLock();
+ serviceLock.lock(); // Should block!
+ servicesBlocked.decrementAndGet();
+ servicesSucceeded.incrementAndGet();
+ LOG.info(" {} : Lock acquired!", serviceName);
+ serviceLock.unlock();
+ }
+ allServicesCompleted.countDown();
+ } catch (Exception e) {
+ LOG.error("{} failed", serviceName, e);
+ allServicesCompleted.countDown();
+ }
+ });
+ // Start all threads
+ checkpointThread.start();
+ Thread keyDeletingThread = createServiceThread.apply("KeyDeletingService",
+ om.getKeyManager().getDeletingService());
+ Thread dirDeletingThread =
createServiceThread.apply("DirectoryDeletingService",
+ om.getKeyManager().getDirDeletingService());
+ Thread snapshotDeletingThread =
createServiceThread.apply("SnapshotDeletingService",
+ om.getKeyManager().getSnapshotDeletingService());
+ keyDeletingThread.start();
+ dirDeletingThread.start();
+ snapshotDeletingThread.start();
+ // Wait a bit, then verify multiple services are blocked
+ Thread.sleep(1000);
+ int blockedCount = servicesBlocked.get();
+ assertTrue(blockedCount > 0, "At least one service should be blocked");
+ assertEquals(0, servicesSucceeded.get(), "No services should have
succeeded yet");
+ // Wait for completion
+ assertTrue(allServicesCompleted.await(10, TimeUnit.SECONDS));
+ // Verify all services eventually succeeded
+ assertEquals(0, servicesBlocked.get(), "No services should be blocked
anymore");
+ assertTrue(servicesSucceeded.get() > 0, "Services should have succeeded
after lock release");
+ }
+
private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
List<Path> files = filesInTarball.filter(p ->
p.toString().contains(".log"))
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index 8a58ed6aa76..7b5fe844d6a 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -94,6 +94,7 @@ public class OMDBCheckpointServletInodeBasedXfer extends
DBCheckpointServlet {
protected static final Logger LOG =
LoggerFactory.getLogger(OMDBCheckpointServletInodeBasedXfer.class);
private static final long serialVersionUID = 1L;
+ private transient BootstrapStateHandler.Lock lock;
@Override
public void init() throws ServletException {
@@ -124,6 +125,12 @@ public void init() throws ServletException {
allowedUsers,
allowedGroups,
om.isSpnegoEnabled());
+ lock = new OMDBCheckpointServlet.Lock(om);
+ }
+
+ @Override
+ public BootstrapStateHandler.Lock getBootstrapStateLock() {
+ return lock;
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]