This is an automated email from the ASF dual-hosted git repository.
sshenoy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new a8206e027a6 HDDS-13768. OM should acquire snapshot cache lock before
taking checkpoint. (#9129)
a8206e027a6 is described below
commit a8206e027a6c8d523e9a48a16267801e57098a89
Author: Sadanand Shenoy <[email protected]>
AuthorDate: Thu Nov 6 20:09:05 2025 +0530
HDDS-13768. OM should acquire snapshot cache lock before taking checkpoint.
(#9129)
---
.../TestOMDbCheckpointServletInodeBasedXfer.java | 183 +++++++++++++++++++--
.../om/OMDBCheckpointServletInodeBasedXfer.java | 88 +++++-----
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 8 +
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 29 ++++
.../hadoop/ozone/om/snapshot/SnapshotCache.java | 1 -
5 files changed, 256 insertions(+), 53 deletions(-)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
index a6ae3eaab21..b936d7ab518 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMDbCheckpointServletInodeBasedXfer.java
@@ -65,8 +65,10 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -102,12 +104,16 @@
import org.apache.hadoop.ozone.om.codec.OMDBDefinition;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.service.DirectoryDeletingService;
import org.apache.hadoop.ozone.om.service.KeyDeletingService;
import org.apache.hadoop.ozone.om.service.SnapshotDeletingService;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -395,7 +401,6 @@ public void testSnapshotDBConsistency() throws Exception {
}
Path snapshotDbDir = Paths.get(newDbDir.toPath().toString(),
OM_SNAPSHOT_CHECKPOINT_DIR,
OM_DB_NAME + "-" + snapshotToModify.getSnapshotId());
- deleteWalFiles(snapshotDbDir);
assertTrue(Files.exists(snapshotDbDir));
String value = getValueFromSnapshotDeleteTable(dummyKey,
snapshotDbDir.toString());
assertNotNull(value);
@@ -456,6 +461,172 @@ public void testWriteDBToArchive(boolean
expectOnlySstFiles) throws Exception {
}
}
+ /**
+ * Verifies that snapshot cache lock coordinates between checkpoint and
purge operations,
+ * preventing race conditions on follower OM where snapshot directory could
be deleted
+ * while checkpoint is reading snapshot data.
+ *
+ * Test steps:
+ * 1. Create keys
+ * 2. Create snapshot 1
+ * 3. Create snapshot 2
+ * 4. Delete snapshot 2 (marks it as DELETED)
+ * 5. Stop SnapshotDeletingService to prevent automatic purge
+ * 6. Invoke checkpoint servlet (acquires bootstrap lock and snapshot cache
lock)
+ * 7. Submit purge request for snapshot 2 during checkpoint processing
(simulates Ratis transaction on follower)
+ * 8. Verify purge waits for snapshot cache lock (blocked while checkpoint
holds it)
+ * 9. Verify checkpoint completes first and tarball includes snapshot 2 data
+ * 10. Verify purge completes after checkpoint releases snapshot cache lock
+ *
+ * @throws Exception if test setup or execution fails
+ */
+ @Test
+ public void testBootstrapOnFollowerConsistency() throws Exception {
+ String volumeName = "vol" + RandomStringUtils.secure().nextNumeric(5);
+ String bucketName = "buck" + RandomStringUtils.secure().nextNumeric(5);
+ setupCluster();
+ om.getKeyManager().getSnapshotSstFilteringService().pause();
+ om.getKeyManager().getSnapshotDeletingService().suspend();
+ // Create test data and snapshots
+ OzoneBucket bucket = TestDataUtil.createVolumeAndBucket(client,
volumeName, bucketName);
+ // Create key before first snapshot
+ TestDataUtil.createKey(bucket, "key1",
+ ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
ReplicationFactor.ONE),
+ "data1".getBytes(StandardCharsets.UTF_8));
+ client.getObjectStore().createSnapshot(volumeName, bucketName,
"snapshot1");
+ client.getObjectStore().createSnapshot(volumeName, bucketName,
"snapshot2");
+ List<OzoneSnapshot> snapshots = new ArrayList<>();
+ client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
+ .forEachRemaining(snapshots::add);
+ assertEquals(2, snapshots.size(), "Should have 2 snapshots initially");
+ OzoneSnapshot snapshot1 = snapshots.stream()
+ .filter(snap -> snap.getName().equals("snapshot1"))
+ .findFirst().get();
+ OzoneSnapshot snapshot2 = snapshots.stream()
+ .filter(snap -> snap.getName().equals("snapshot2")).findFirst().get();
+ assertEquals(2, snapshots.size(), "Should have 2 snapshots initially");
+ waitTillSnapshotInDeletedState(volumeName, bucketName, snapshot2);
+ // Setup servlet mocks for checkpoint processing
+ setupMocks();
+
when(requestMock.getParameter(OZONE_DB_CHECKPOINT_INCLUDE_SNAPSHOT_DATA)).thenReturn("true");
+ CountDownLatch purgeSubmitted = new CountDownLatch(1);
+ AtomicLong checkpointEndTime = new AtomicLong(0);
+ AtomicLong purgeEndTime = new AtomicLong(0);
+
+ DBStore dbStore = om.getMetadataManager().getStore();
+ DBStore spyDbStore = spy(dbStore);
+ AtomicReference<DBCheckpoint> capturedCheckpoint = new AtomicReference<>();
+ when(spyDbStore.getCheckpoint(true)).thenAnswer(invocation -> {
+ // Submit purge request in background thread (simulating Ratis
transaction on follower)
+ Thread purgeThread = new Thread(() -> {
+ try {
+ String snapshotTableKey = SnapshotInfo.getTableKey(volumeName,
bucketName, snapshot2.getName());
+ // Construct SnapshotPurge request
+ OzoneManagerProtocolProtos.SnapshotPurgeRequest snapshotPurgeRequest
=
+ OzoneManagerProtocolProtos.SnapshotPurgeRequest.newBuilder()
+ .addSnapshotDBKeys(snapshotTableKey)
+ .build();
+
+ OzoneManagerProtocolProtos.OMRequest omRequest =
OzoneManagerProtocolProtos.OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.SnapshotPurge)
+ .setSnapshotPurgeRequest(snapshotPurgeRequest)
+ .setClientId(UUID.randomUUID().toString())
+ .build();
+
+ purgeSubmitted.countDown();
+ long purgeStartTime = System.currentTimeMillis();
+ // Submit via Ratis (simulating follower receiving transaction)
+ // This will trigger OMSnapshotPurgeResponse which needs
SNAPSHOT_DB_LOCK
+ ClientId clientId = ClientId.randomId();
+ long callId = 1;
+ OzoneManagerProtocolProtos.OMResponse
+ response = om.getOmRatisServer().submitRequest(omRequest,
clientId, callId);
+
+ if (response.getSuccess()) {
+ // Wait for purge to complete (snapshot removed from table)
+ GenericTestUtils.waitFor(() -> {
+ try {
+ boolean purged =
om.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey) == null;
+ if (purged) {
+ purgeEndTime.set(System.currentTimeMillis());
+ long duration = purgeEndTime.get() - purgeStartTime;
+ LOG.info("Purge completed in {} ms", duration);
+ }
+ return purged;
+ } catch (Exception ex) {
+ return false;
+ }
+ }, 100, 40_000);
+ }
+ } catch (Exception e) {
+ LOG.error("Purge submission failed", e);
+ }
+ });
+ purgeThread.start();
+
+ // Wait for purge request to be submitted
+ assertTrue(purgeSubmitted.await(2, TimeUnit.SECONDS), "Purge should be
submitted");
+ // Small delay to ensure purge request reaches state machine
+ Thread.sleep(200);
+ DBCheckpoint checkpoint = spy(dbStore.getCheckpoint(true));
+ doNothing().when(checkpoint).cleanupCheckpoint(); // Don't cleanup for
verification
+ capturedCheckpoint.set(checkpoint);
+ return checkpoint;
+ });
+ // Initialize servlet
+ doCallRealMethod().when(omDbCheckpointServletMock).initialize(any(), any(),
+ eq(false), any(), any(), eq(false));
+ omDbCheckpointServletMock.initialize(spyDbStore,
om.getMetrics().getDBCheckpointMetrics(),
+ false, om.getOmAdminUsernames(), om.getOmAdminGroups(), false);
+ when(responseMock.getOutputStream()).thenReturn(servletOutputStream);
+ // Process checkpoint servlet
+ omDbCheckpointServletMock.doGet(requestMock, responseMock);
+ String testDirName = folder.resolve("testDir").toString();
+ String newDbDirName = testDirName + OM_KEY_PREFIX + OM_DB_NAME;
+ File newDbDir = new File(newDbDirName);
+ assertTrue(newDbDir.mkdirs());
+ FileUtil.unTar(tempFile, newDbDir);
+ OmSnapshotUtils.createHardLinks(newDbDir.toPath(), true);
+ Path snapshot1DbDir = Paths.get(newDbDir.toPath().toString(),
OM_SNAPSHOT_CHECKPOINT_DIR,
+ OM_DB_NAME + "-" + snapshot1.getSnapshotId());
+ Path snapshot2DbDir = Paths.get(newDbDir.toPath().toString(),
OM_SNAPSHOT_CHECKPOINT_DIR,
+ OM_DB_NAME + "-" + snapshot2.getSnapshotId());
+ assertTrue(purgeEndTime.get() >= checkpointEndTime.get(),
+ "Purge should complete after checkpoint releases snapshot cache lock");
+
+ // Verify snapshot is purged
+ List<OzoneSnapshot> snapshotsAfter = new ArrayList<>();
+ client.getObjectStore().listSnapshot(volumeName, bucketName, "", null)
+ .forEachRemaining(snapshotsAfter::add);
+ assertEquals(1, snapshotsAfter.size(), "Snapshot2 should be purged");
+ boolean snapshot1IncludedInCheckpoint = Files.exists(snapshot1DbDir);
+ boolean snapshot2IncludedInCheckpoint = Files.exists(snapshot2DbDir);
+ assertTrue(snapshot1IncludedInCheckpoint && snapshot2IncludedInCheckpoint,
+ "Checkpoint should include both snapshot1 and snapshot2 data");
+ // Cleanup
+ if (capturedCheckpoint.get() != null) {
+ capturedCheckpoint.get().cleanupCheckpoint();
+ }
+ }
+
+ private void waitTillSnapshotInDeletedState(String volumeName, String
bucketName, OzoneSnapshot snapshot)
+ throws IOException, InterruptedException, TimeoutException {
+ String snapshotTableKey = SnapshotInfo.getTableKey(volumeName, bucketName,
snapshot.getName());
+ // delete snapshot and wait for snapshot to be purged
+ client.getObjectStore().deleteSnapshot(volumeName, bucketName,
snapshot.getName());
+ GenericTestUtils.waitFor(() -> {
+ try {
+ SnapshotInfo snapshotInfo =
om.getMetadataManager().getSnapshotInfoTable().get(snapshotTableKey);
+ return snapshotInfo != null &&
+
snapshotInfo.getSnapshotStatus().name().equals(SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED.name());
+ } catch (Exception ex) {
+ LOG.error("Exception while querying snapshot info for key in cache
{}", snapshotTableKey, ex);
+ return false;
+ }
+ }, 100, 30_000);
+ om.awaitDoubleBufferFlush();
+ }
+
@Test
public void testBootstrapLockCoordination() throws Exception {
// Create mocks for all background services
@@ -686,16 +857,6 @@ public void
testCheckpointIncludesSnapshotsFromFrozenState() throws Exception {
}
}
- private static void deleteWalFiles(Path snapshotDbDir) throws IOException {
- try (Stream<Path> filesInTarball = Files.list(snapshotDbDir)) {
- List<Path> files = filesInTarball.filter(p ->
p.toString().contains(".log"))
- .collect(Collectors.toList());
- for (Path p : files) {
- Files.delete(p);
- }
- }
- }
-
private static Set<Path> getAllPathsInTarball(File newDbDir) throws
IOException {
Set<Path> allPathsInTarball = new HashSet<>();
try (Stream<Path> filesInTarball = Files.list(newDbDir.toPath())) {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
index 748329be83a..3291c37a0b8 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMDBCheckpointServletInodeBasedXfer.java
@@ -28,7 +28,6 @@
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_DEFAULT;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_MAX_TOTAL_SST_SIZE_KEY;
import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPath;
-import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK;
import static
org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.includeSnapshotData;
import static
org.apache.hadoop.ozone.om.snapshot.OMDBCheckpointUtils.logEstimatedTarballSize;
import static org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils.DATA_PREFIX;
@@ -54,7 +53,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.servlet.ServletException;
@@ -72,12 +70,15 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time;
import org.apache.ozone.compaction.log.CompactionLogEntry;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -256,29 +257,44 @@ public void writeDbDataToStream(HttpServletRequest
request, OutputStream destina
if (shouldContinue) {
// we finished transferring files from snapshot DB's by now and
// this is the last step where we transfer the active om.db contents
- // get the list of sst files of the checkpoint.
- checkpoint = createAndPrepareCheckpoint(true);
- List<Path> sstBackupFiles =
extractSSTFilesFromCompactionLog(checkpoint);
- // unlimited files as we want the Active DB contents to be transferred
in a single batch
- maxTotalSstSize.set(Long.MAX_VALUE);
- Path checkpointDir = checkpoint.getCheckpointLocation();
Map<String, String> hardLinkFileMap = new HashMap<>();
- writeDBToArchive(sstFilesToExclude, checkpointDir,
- maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
false);
- if (includeSnapshotData) {
- // get the list of snapshots from the checkpoint
- try (OmMetadataManagerImpl checkpointMetadataManager =
OmMetadataManagerImpl
- .createCheckpointMetadataManager(om.getConfiguration(),
checkpoint)) {
- snapshotPaths = getSnapshotDirsFromDB(omMetadataManager,
checkpointMetadataManager,
- snapshotLocalDataManager);
- }
- writeDBToArchive(sstFilesToExclude, getCompactionLogDir(),
maxTotalSstSize, archiveOutputStream, tmpdir,
+ SnapshotCache snapshotCache =
om.getOmSnapshotManager().getSnapshotCache();
+ /*
+ * Acquire snapshot cache lock when includeSnapshotData is true to
prevent race conditions
+ * between checkpoint operations and snapshot purge operations.
Without this lock, a purge
+ * operation (e.g., from a Ratis transaction on follower OM) could
delete snapshot directories
+ * while checkpoint is reading snapshot data, leading to
FileNotFoundException or corrupted
+ * checkpoint data. The lock ensures checkpoint completes reading
snapshot data before purge
+ * can delete the snapshot directory.
+ *
+ * When includeSnapshotData is false, lock is set to null and no
locking is performed.
+ * In this case, the try-with-resources block does not call close() on
any resource,
+ * which is intentional because snapshot consistency is not required.
+ */
+ try (UncheckedAutoCloseableSupplier<OMLockDetails> lock =
includeSnapshotData ? snapshotCache.lock() : null) {
+ // get the list of sst files of the checkpoint.
+ checkpoint = createAndPrepareCheckpoint(true);
+ // unlimited files as we want the Active DB contents to be
transferred in a single batch
+ maxTotalSstSize.set(Long.MAX_VALUE);
+ Path checkpointDir = checkpoint.getCheckpointLocation();
+ writeDBToArchive(sstFilesToExclude, checkpointDir, maxTotalSstSize,
archiveOutputStream, tmpdir,
hardLinkFileMap, false);
- writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(),
- maxTotalSstSize, archiveOutputStream, tmpdir, hardLinkFileMap,
false);
- // This is done to ensure all data to be copied correctly is flushed
in the snapshot DB
- transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths,
maxTotalSstSize,
- archiveOutputStream, hardLinkFileMap);
+ if (includeSnapshotData) {
+ List<Path> sstBackupFiles =
extractSSTFilesFromCompactionLog(checkpoint);
+ // get the list of snapshots from the checkpoint
+ try (OmMetadataManagerImpl checkpointMetadataManager =
OmMetadataManagerImpl
+ .createCheckpointMetadataManager(om.getConfiguration(),
checkpoint)) {
+ snapshotPaths = getSnapshotDirsFromDB(omMetadataManager,
checkpointMetadataManager,
+ snapshotLocalDataManager);
+ }
+ writeDBToArchive(sstFilesToExclude, getCompactionLogDir(),
maxTotalSstSize, archiveOutputStream, tmpdir,
+ hardLinkFileMap, false);
+ writeDBToArchive(sstFilesToExclude, sstBackupFiles.stream(),
maxTotalSstSize, archiveOutputStream, tmpdir,
+ hardLinkFileMap, false);
+ // This is done to ensure all data to be copied correctly is
flushed in the snapshot DB
+ transferSnapshotData(sstFilesToExclude, tmpdir, snapshotPaths,
maxTotalSstSize, archiveOutputStream,
+ hardLinkFileMap);
+ }
}
writeHardlinkFile(getConf(), hardLinkFileMap, archiveOutputStream);
includeRatisSnapshotCompleteFlag(archiveOutputStream);
@@ -307,25 +323,15 @@ public void writeDbDataToStream(HttpServletRequest
request, OutputStream destina
void transferSnapshotData(Set<String> sstFilesToExclude, Path tmpdir,
Set<Path> snapshotPaths,
AtomicLong maxTotalSstSize, ArchiveOutputStream<TarArchiveEntry>
archiveOutputStream,
Map<String, String> hardLinkFileMap) throws IOException {
- OzoneManager om = (OzoneManager)
getServletContext().getAttribute(OzoneConsts.OM_CONTEXT_ATTRIBUTE);
- OMMetadataManager omMetadataManager = om.getMetadataManager();
for (Path snapshotDir : snapshotPaths) {
- String snapshotId =
OmSnapshotManager.extractSnapshotIDFromCheckpointDirName(snapshotDir.toString());
- omMetadataManager.getLock().acquireReadLock(SNAPSHOT_DB_LOCK,
snapshotId);
- try {
- // invalidate closes the snapshot DB
-
om.getOmSnapshotManager().invalidateCacheEntry(UUID.fromString(snapshotId));
- writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize,
archiveOutputStream, tmpdir,
- hardLinkFileMap, false);
- Path snapshotLocalPropertyYaml = Paths.get(
-
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir));
- if (Files.exists(snapshotLocalPropertyYaml)) {
- File yamlFile = snapshotLocalPropertyYaml.toFile();
- hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName());
- linkAndIncludeFile(yamlFile, yamlFile.getName(),
archiveOutputStream, tmpdir);
- }
- } finally {
- omMetadataManager.getLock().releaseReadLock(SNAPSHOT_DB_LOCK,
snapshotId);
+ writeDBToArchive(sstFilesToExclude, snapshotDir, maxTotalSstSize,
archiveOutputStream, tmpdir, hardLinkFileMap,
+ false);
+ Path snapshotLocalPropertyYaml = Paths.get(
+
OmSnapshotLocalDataManager.getSnapshotLocalPropertyYamlPath(snapshotDir));
+ if (Files.exists(snapshotLocalPropertyYaml)) {
+ File yamlFile = snapshotLocalPropertyYaml.toFile();
+ hardLinkFileMap.put(yamlFile.getAbsolutePath(), yamlFile.getName());
+ linkAndIncludeFile(yamlFile, yamlFile.getName(), archiveOutputStream,
tmpdir);
}
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index 4fcb8ed22e8..06215c8df76 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -485,6 +485,14 @@ public int getSnapshotCacheSize() {
return snapshotCache == null ? 0 : snapshotCache.size();
}
+ /**
+ * Get snapshot cache instance.
+ * @return snapshotCache.
+ */
+ public SnapshotCache getSnapshotCache() {
+ return snapshotCache;
+ }
+
/**
* Immediately invalidate all entries and close their DB instances in cache.
*/
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
index 04515dcd728..1f77f6f5b49 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerDoubleBuffer.java
@@ -83,6 +83,7 @@ public final class OzoneManagerDoubleBuffer {
private final Daemon daemon;
/** Is the {@link #daemon} running? */
private final AtomicBoolean isRunning = new AtomicBoolean(false);
+ private final AtomicBoolean isPaused = new AtomicBoolean(false);
/** Notify flush operations are completed by the {@link #daemon}. */
private final FlushNotifier flushNotifier;
@@ -211,6 +212,22 @@ public OzoneManagerDoubleBuffer start() {
return this;
}
+ @VisibleForTesting
+ public void pause() {
+ synchronized (this) {
+ isPaused.set(true);
+ this.notifyAll();
+ }
+ }
+
+ @VisibleForTesting
+ public void unpause() {
+ synchronized (this) {
+ isPaused.set(false);
+ this.notifyAll();
+ }
+ }
+
/**
* Acquires the given number of permits from unFlushedTransactions,
* blocking until all are available, or the thread is interrupted.
@@ -277,6 +294,18 @@ private void addToBatchTransactionInfoWithTrace(String
parentName,
@VisibleForTesting
public void flushTransactions() {
while (isRunning.get() && canFlush()) {
+ // Check if paused
+ synchronized (this) {
+ while (isPaused.get() && isRunning.get()) {
+ try {
+ this.wait();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ }
+
flushCurrentBuffer();
}
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index ce79c32fc4e..ff0b04b0541 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -151,7 +151,6 @@ public void invalidate(UUID key) {
LOG.debug("SnapshotId: '{}' does not exist in snapshot cache.", k);
} else {
try {
- v.get().getMetadataManager().getStore().flushDB();
v.get().close();
} catch (IOException e) {
throw new IllegalStateException("Failed to close snapshotId: " +
key, e);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]