This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 299b2948095 HDDS-13009. Background snapshot defrag service (#9324)
299b2948095 is described below
commit 299b294809521712a37e33def5ac6717985409b7
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Nov 24 23:22:48 2025 -0500
HDDS-13009. Background snapshot defrag service (#9324)
---
.../org/apache/hadoop/hdds/utils/db/DBStore.java | 8 +
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 24 +
.../apache/hadoop/ozone/om/OMMetadataManager.java | 8 +
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 12 +-
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 20 +-
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 60 ++
.../response/snapshot/OMSnapshotPurgeResponse.java | 40 +-
.../hadoop/ozone/om/snapshot/SnapshotCache.java | 24 +-
.../om/snapshot/defrag/SnapshotDefragService.java | 619 ++++++++++++++++-----
.../snapshot/defrag/TestSnapshotDefragService.java | 212 +++++++
.../snapshot/diff/delta/TestRDBDifferComputer.java | 2 +
11 files changed, 844 insertions(+), 185 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index 0fb91f42d90..f71ffe42197 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -145,6 +145,12 @@ <KEY, VALUE> TypedTable<KEY, VALUE> getTable(
*/
Map<Integer, String> getTableNames();
+ /**
+ * Drop the specific table.
+ * @param tableName - Name of the table to truncate.
+ */
+ void dropTable(String tableName) throws RocksDatabaseException;
+
/**
* Get data written to DB since a specific sequence number.
*/
@@ -162,4 +168,6 @@ DBUpdatesWrapper getUpdatesSince(long sequenceNumber, long
limitCount)
* @return true if the DB is closed.
*/
boolean isClosed();
+
+ String getSnapshotsParentDir();
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index e3853a84211..8e2e2b85426 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -189,6 +189,7 @@ public String getSnapshotMetadataDir() {
return dbLocation.getParent() + OM_KEY_PREFIX + OM_SNAPSHOT_DIFF_DIR;
}
+ @Override
public String getSnapshotsParentDir() {
return snapshotsParentDir;
}
@@ -332,6 +333,29 @@ public Map<Integer, String> getTableNames() {
return db.getColumnFamilyNames();
}
+ /**
+ /**
+ * Drops a table from the database by removing its associated column family.
+ * <p>
+ * <b>Warning:</b> This operation should be used with extreme caution. If
the table needs to be used again,
+ * it is recommended to reinitialize the entire DB store, as the column
family will be permanently
+ * removed from the database. This method is suitable for truncating a
RocksDB column family in a single operation.
+ *
+ * @param tableName the name of the table to be dropped
+ * @throws RocksDatabaseException if an error occurs while attempting to
drop the table
+ */
+ @Override
+ public void dropTable(String tableName) throws RocksDatabaseException {
+ ColumnFamily columnFamily = db.getColumnFamily(tableName);
+ if (columnFamily != null) {
+ try {
+
db.getManagedRocksDb().get().dropColumnFamily(columnFamily.getHandle());
+ } catch (RocksDBException e) {
+ throw new RocksDatabaseException("Failed to drop " + tableName, e);
+ }
+ }
+ }
+
public Collection<ColumnFamily> getColumnFamilies() {
return db.getExtraColumnFamilies();
}
diff --git
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index baac362da74..461324d0022 100644
---
a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++
b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -21,6 +21,7 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.nio.file.Path;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
@@ -85,6 +86,13 @@ public interface OMMetadataManager extends DBStoreHAManager,
AutoCloseable {
@VisibleForTesting
DBStore getStore();
+ /**
+ * Retrieves the parent directory of all the snapshots in the system.
+ *
+ * @return a Path object representing the parent directory of the snapshot.
+ */
+ Path getSnapshotParentDir();
+
/**
* Returns the OzoneManagerLock used on Metadata DB.
*
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index b5137ed3d61..a0dd69a9752 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@ -421,10 +421,14 @@ public void startSnapshotDefragService(OzoneConfiguration
conf) {
OZONE_SNAPSHOT_DEFRAG_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- snapshotDefragService =
- new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS,
- serviceTimeout, ozoneManager, conf);
- snapshotDefragService.start();
+ try {
+ snapshotDefragService =
+ new SnapshotDefragService(serviceInterval, TimeUnit.MILLISECONDS,
+ serviceTimeout, ozoneManager, conf);
+ snapshotDefragService.start();
+ } catch (IOException e) {
+ LOG.error("Error starting Snapshot Defrag Service", e);
+ }
} else {
LOG.info("SnapshotDefragService is disabled. Snapshot defragmentation
will not run periodically.");
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 6e79ca25ac8..aeaf860c964 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -234,6 +234,11 @@ protected OmMetadataManagerImpl() {
public static OmMetadataManagerImpl createCheckpointMetadataManager(
OzoneConfiguration conf, DBCheckpoint checkpoint) throws IOException {
+ return createCheckpointMetadataManager(conf, checkpoint, true);
+ }
+
+ public static OmMetadataManagerImpl createCheckpointMetadataManager(
+ OzoneConfiguration conf, DBCheckpoint checkpoint, boolean readOnly)
throws IOException {
Path path = checkpoint.getCheckpointLocation();
Path parent = path.getParent();
if (parent == null) {
@@ -246,7 +251,11 @@ public static OmMetadataManagerImpl
createCheckpointMetadataManager(
throw new IllegalStateException("DB checkpoint dir name should not "
+ "have been null. Checkpoint path is " + path);
}
- return new OmMetadataManagerImpl(conf, dir, name.toString());
+ return new OmMetadataManagerImpl(conf, dir, name.toString(), readOnly);
+ }
+
+ protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String
name) throws IOException {
+ this(conf, dir, name, true);
}
/**
@@ -257,7 +266,7 @@ public static OmMetadataManagerImpl
createCheckpointMetadataManager(
* @param name - Checkpoint directory name.
* @throws IOException
*/
- protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String
name)
+ protected OmMetadataManagerImpl(OzoneConfiguration conf, File dir, String
name, boolean readOnly)
throws IOException {
lock = new OmReadOnlyLock();
hierarchicalLockManager = new ReadOnlyHierarchicalResourceLockManager();
@@ -265,7 +274,7 @@ protected OmMetadataManagerImpl(OzoneConfiguration conf,
File dir, String name)
int maxOpenFiles = conf.getInt(OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES,
OZONE_OM_SNAPSHOT_DB_MAX_OPEN_FILES_DEFAULT);
this.store = newDBStoreBuilder(conf, name, dir)
- .setOpenReadOnly(true)
+ .setOpenReadOnly(readOnly)
.disableDefaultCFAutoCompaction(true)
.setMaxNumberOfOpenFiles(maxOpenFiles)
.setEnableCompactionDag(false)
@@ -519,6 +528,11 @@ public DBStore getStore() {
return store;
}
+ @Override
+ public Path getSnapshotParentDir() {
+ return Paths.get(store.getSnapshotsParentDir());
+ }
+
/**
* Given a volume return the corresponding DB key.
*
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index c3b9feae77f..a1c7aa918fa 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om;
+import static org.apache.commons.io.file.PathUtils.deleteDirectory;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
import static
org.apache.hadoop.hdds.utils.db.DBStoreBuilder.DEFAULT_COLUMN_FAMILY_NAME;
@@ -97,6 +98,7 @@
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.SnapshotDiffJob;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.service.SnapshotDiffCleanupService;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
import org.apache.hadoop.ozone.om.snapshot.SnapshotCache;
@@ -512,6 +514,64 @@ public void invalidateCacheEntry(UUID key) {
}
}
+ /**
+ * Deletes the snapshot checkpoint directories for a given snapshot ID up to
+ * the specified maximum version. This method ensures that all directories
+ * containing checkpoint data for the specified snapshot ID up to the max
+ * version are removed in a controlled manner.
+ *
+ * @param snapshotId The unique identifier of the snapshot whose checkpoint
+ * directories are to be deleted.
+ * @param maxVersion The maximum version of checkpoint directories to delete.
+ * If a value less than 0 is provided, it defaults to the
+ * current maximum version of the snapshot.
+ * @throws IOException If there is a failure acquiring the snapshot database
+ * lock or while deleting directories.
+ * @throws IllegalArgumentException If the specified maxVersion is greater
+ * than the current maximum version of the
+ * snapshot.
+ */
+ public void deleteSnapshotCheckpointDirectories(UUID snapshotId, int
maxVersion) throws IOException {
+ // Acquire Snapshot DBHandle lock before removing the older version to
ensure all readers are done with the
+ // snapshot db use.
+ try (UncheckedAutoCloseableSupplier<OMLockDetails> lock =
getSnapshotCache().lock(snapshotId)) {
+ if (!lock.get().isLockAcquired()) {
+ throw new IOException("Failed to acquire dbHandlelock on snapshot: " +
snapshotId);
+ }
+ try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataMetaProvider
snapMetaProvider =
+
snapshotLocalDataManager.getOmSnapshotLocalDataMeta(snapshotId)) {
+ if (maxVersion < 0) {
+ maxVersion = snapMetaProvider.getMeta().getVersion();
+ }
+ if (maxVersion > snapMetaProvider.getMeta().getVersion()) {
+ throw new IllegalArgumentException(
+ String.format("Max Version to be deleted can never be greater
than the existing " +
+ "version of the snapshot. Argument passed : %d and
snapshotMaxVersion : %d", maxVersion,
+ snapMetaProvider.getMeta().getVersion()));
+ }
+ // Binary search the smallest existing version and delete the older
versions starting from the smallest version.
+ // This is to ensure efficient crash recovery.
+ int smallestExistingVersion = 0;
+ int largestExistingVersion = maxVersion;
+ while (smallestExistingVersion <= largestExistingVersion) {
+ int midVersion = smallestExistingVersion + (largestExistingVersion -
smallestExistingVersion) / 2;
+ Path path =
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(),
snapshotId, midVersion);
+ if (path.toFile().exists()) {
+ largestExistingVersion = midVersion - 1;
+ } else {
+ smallestExistingVersion = midVersion + 1;
+ }
+ }
+ // Delete the older version directories. Always starting deletes from
smallest version to largest version to
+ // ensure binary search works correctly on a later basis.
+ for (int version = smallestExistingVersion; version <= maxVersion;
version++) {
+ Path path =
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(),
snapshotId, version);
+ deleteDirectory(path);
+ }
+ }
+ }
+ }
+
/**
* Creates snapshot checkpoint that corresponds to snapshotInfo.
* @param omMetadataManager the metadata manager
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
index 3bc8a8dc27b..852a7a63723 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotPurgeResponse.java
@@ -18,22 +18,18 @@
package org.apache.hadoop.ozone.om.response.snapshot;
import static
org.apache.hadoop.ozone.om.codec.OMDBDefinition.SNAPSHOT_INFO_TABLE;
-import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_LOCK;
import com.google.common.annotations.VisibleForTesting;
import jakarta.annotation.Nonnull;
import java.io.IOException;
-import java.nio.file.Path;
import java.util.List;
import java.util.Map;
-import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
@@ -95,10 +91,9 @@ protected void addToDBBatch(OMMetadataManager
omMetadataManager,
if (snapshotInfo == null) {
continue;
}
-
+ OmSnapshotManager omSnapshotManager =
metadataManager.getOzoneManager().getOmSnapshotManager();
// Remove and close snapshot's RocksDB instance from SnapshotCache.
- ((OmMetadataManagerImpl)
omMetadataManager).getOzoneManager().getOmSnapshotManager()
- .invalidateCacheEntry(snapshotInfo.getSnapshotId());
+ omSnapshotManager.invalidateCacheEntry(snapshotInfo.getSnapshotId());
// Remove the snapshot from snapshotId to snapshotTableKey map.
((OmMetadataManagerImpl) omMetadataManager).getSnapshotChainManager()
.removeFromSnapshotIdToTable(snapshotInfo.getSnapshotId());
@@ -109,7 +104,8 @@ protected void addToDBBatch(OMMetadataManager
omMetadataManager,
// snapshot purged txn is flushed to rocksdb.
updateLocalData(snapshotLocalDataManager, snapshotInfo);
// Delete Snapshot checkpoint directory.
- deleteCheckpointDirectory(snapshotLocalDataManager, omMetadataManager,
snapshotInfo);
+
+
omSnapshotManager.deleteSnapshotCheckpointDirectories(snapshotInfo.getSnapshotId(),
-1);
// Delete snapshotInfo from the table.
omMetadataManager.getSnapshotInfoTable().deleteWithBatch(batchOperation,
dbKey);
}
@@ -133,34 +129,6 @@ private void updateLocalData(OmSnapshotLocalDataManager
localDataManager, Snapsh
}
}
- /**
- * Deletes the checkpoint directory for a snapshot.
- */
- private void deleteCheckpointDirectory(OmSnapshotLocalDataManager
snapshotLocalDataManager,
- OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo) throws
IOException {
- // Acquiring write lock to avoid race condition with sst filtering service
which creates a sst filtered file
- // inside the snapshot directory. Any operation apart which doesn't
create/delete files under this snapshot
- // directory can run in parallel along with this operation.
- OMLockDetails omLockDetails = omMetadataManager.getLock()
- .acquireWriteLock(SNAPSHOT_DB_LOCK,
snapshotInfo.getSnapshotId().toString());
- boolean acquiredSnapshotLock = omLockDetails.isLockAcquired();
- if (acquiredSnapshotLock) {
- try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataMetaProvider
snapMetaProvider =
-
snapshotLocalDataManager.getOmSnapshotLocalDataMeta(snapshotInfo)) {
- Path snapshotDirPath =
OmSnapshotManager.getSnapshotPath(omMetadataManager, snapshotInfo,
- snapMetaProvider.getMeta().getVersion());
- try {
- FileUtils.deleteDirectory(snapshotDirPath.toFile());
- } catch (IOException ex) {
- LOG.error("Failed to delete snapshot directory {} for snapshot {}",
- snapshotDirPath, snapshotInfo.getTableKey(), ex);
- } finally {
- omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_DB_LOCK,
snapshotInfo.getSnapshotId().toString());
- }
- }
- }
- }
-
@VisibleForTesting
public Map<String, SnapshotInfo> getUpdatedSnapInfos() {
return updatedSnapInfos;
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
index 3868436eb51..0699fdd39db 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotCache.java
@@ -291,7 +291,10 @@ public void release(UUID key) {
*/
public UncheckedAutoCloseableSupplier<OMLockDetails> lock() {
return lock(() -> lock.acquireResourceWriteLock(SNAPSHOT_DB_LOCK),
- () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () ->
cleanup(true));
+ () -> lock.releaseResourceWriteLock(SNAPSHOT_DB_LOCK), () -> {
+ cleanup(true);
+ return dbMap.isEmpty();
+ });
}
/**
@@ -303,7 +306,10 @@ public UncheckedAutoCloseableSupplier<OMLockDetails>
lock() {
public UncheckedAutoCloseableSupplier<OMLockDetails> lock(UUID snapshotId) {
return lock(() -> lock.acquireWriteLock(SNAPSHOT_DB_LOCK,
snapshotId.toString()),
() -> lock.releaseWriteLock(SNAPSHOT_DB_LOCK, snapshotId.toString()),
- () -> cleanup(snapshotId));
+ () -> {
+ cleanup(snapshotId, false);
+ return !dbMap.containsKey(snapshotId);
+ });
}
private OMLockDetails getEmptyOmLockDetails(OMLockDetails lockDetails) {
@@ -311,14 +317,13 @@ private OMLockDetails getEmptyOmLockDetails(OMLockDetails
lockDetails) {
}
private UncheckedAutoCloseableSupplier<OMLockDetails>
lock(Supplier<OMLockDetails> lockFunction,
- Supplier<OMLockDetails> unlockFunction, Supplier<Void> cleanupFunction) {
+ Supplier<OMLockDetails> unlockFunction, Supplier<Boolean>
cleanupFunction) {
Supplier<OMLockDetails> emptyLockFunction = () ->
getEmptyOmLockDetails(lockFunction.get());
Supplier<OMLockDetails> emptyUnlockFunction = () ->
getEmptyOmLockDetails(unlockFunction.get());
AtomicReference<OMLockDetails> lockDetails = new
AtomicReference<>(emptyLockFunction.get());
if (lockDetails.get().isLockAcquired()) {
- cleanupFunction.get();
- if (!dbMap.isEmpty()) {
+ if (!cleanupFunction.get()) {
lockDetails.set(emptyUnlockFunction.get());
}
}
@@ -349,14 +354,19 @@ public OMLockDetails get() {
private synchronized Void cleanup(boolean force) {
if (force || dbMap.size() > cacheSizeLimit) {
for (UUID evictionKey : pendingEvictionQueue) {
- cleanup(evictionKey);
+ cleanup(evictionKey, true);
}
}
return null;
}
- private synchronized Void cleanup(UUID evictionKey) {
+ private synchronized Void cleanup(UUID evictionKey, boolean
expectKeyToBePresent) {
ReferenceCounted<OmSnapshot> snapshot = dbMap.get(evictionKey);
+
+ if (!expectKeyToBePresent && snapshot == null) {
+ return null;
+ }
+
if (snapshot != null && snapshot.getTotalRefCount() == 0) {
try {
compactSnapshotDB(snapshot.get());
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
index 9be4a0d3389..c54f405e242 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/defrag/SnapshotDefragService.java
@@ -17,42 +17,80 @@
package org.apache.hadoop.ozone.om.snapshot.defrag;
+import static java.nio.file.Files.createDirectories;
+import static org.apache.commons.io.file.PathUtils.deleteDirectory;
+import static
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK;
import static
org.apache.hadoop.ozone.om.OMConfigKeys.SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT;
-import static org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_GC_LOCK;
+import static
org.apache.hadoop.ozone.om.OmSnapshotManager.COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT;
+import static
org.apache.hadoop.ozone.om.lock.FlatResource.SNAPSHOT_DB_CONTENT_LOCK;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
+import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Collections;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.hdds.utils.db.ByteArrayCodec;
+import org.apache.hadoop.hdds.utils.db.CodecBuffer;
+import org.apache.hadoop.hdds.utils.db.CodecException;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.RDBSstFileWriter;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.hdds.utils.db.RocksDBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.hdds.utils.db.StringCodec;
import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedCompactRangeOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.SnapshotChainManager;
-import org.apache.hadoop.ozone.om.SstFilteringService;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OMLockDetails;
import org.apache.hadoop.ozone.om.snapshot.MultiSnapshotLocks;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
+import
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider;
+import org.apache.hadoop.ozone.om.snapshot.SnapshotUtils;
+import
org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer;
+import org.apache.hadoop.ozone.om.snapshot.util.TableMergeIterator;
import org.apache.hadoop.ozone.om.upgrade.OMLayoutFeature;
+import org.apache.hadoop.ozone.util.ClosableIterator;
+import org.apache.logging.log4j.util.Strings;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
+import org.apache.ozone.rocksdb.util.SstFileSetReader;
+import org.apache.ratis.util.UncheckedAutoCloseable;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
+import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,16 +124,24 @@ public class SnapshotDefragService extends
BackgroundService
private final AtomicLong snapshotsDefraggedCount;
private final AtomicBoolean running;
- private final MultiSnapshotLocks snapshotIdLocks;
+ private final MultiSnapshotLocks snapshotContentLocks;
private final OzoneConfiguration conf;
private final BootstrapStateHandler.Lock lock = new
BootstrapStateHandler.Lock();
+ private final String tmpDefragDir;
+ private final OmSnapshotManager omSnapshotManager;
+ private final OmSnapshotLocalDataManager snapshotLocalDataManager;
+ private final List<UUID> lockIds;
+ private final CompositeDeltaDiffComputer deltaDiffComputer;
+ private final Path differTmpDir;
public SnapshotDefragService(long interval, TimeUnit unit, long
serviceTimeout,
- OzoneManager ozoneManager, OzoneConfiguration configuration) {
+ OzoneManager ozoneManager, OzoneConfiguration configuration) throws
IOException {
super("SnapshotDefragService", interval, unit, DEFRAG_CORE_POOL_SIZE,
serviceTimeout, ozoneManager.getThreadNamePrefix());
this.ozoneManager = ozoneManager;
+ this.omSnapshotManager = ozoneManager.getOmSnapshotManager();
+ this.snapshotLocalDataManager =
omSnapshotManager.getSnapshotLocalDataManager();
this.snapshotLimitPerTask = configuration
.getLong(SNAPSHOT_DEFRAG_LIMIT_PER_TASK,
SNAPSHOT_DEFRAG_LIMIT_PER_TASK_DEFAULT);
@@ -103,7 +149,22 @@ public SnapshotDefragService(long interval, TimeUnit unit,
long serviceTimeout,
snapshotsDefraggedCount = new AtomicLong(0);
running = new AtomicBoolean(false);
IOzoneManagerLock omLock = ozoneManager.getMetadataManager().getLock();
- this.snapshotIdLocks = new MultiSnapshotLocks(omLock, SNAPSHOT_GC_LOCK,
true, 1);
+ this.snapshotContentLocks = new MultiSnapshotLocks(omLock,
SNAPSHOT_DB_CONTENT_LOCK, true, 1);
+ Path tmpDefragDirPath =
ozoneManager.getMetadataManager().getSnapshotParentDir().toAbsolutePath()
+ .resolve("tmp_defrag");
+ // Delete and recreate tmp dir if it exists
+ if (tmpDefragDirPath.toFile().exists()) {
+ deleteDirectory(tmpDefragDirPath);
+ }
+ createDirectories(tmpDefragDirPath);
+ this.tmpDefragDir = tmpDefragDirPath.toString();
+ this.differTmpDir = tmpDefragDirPath.resolve("differSstFiles");
+
+ this.deltaDiffComputer = new CompositeDeltaDiffComputer(omSnapshotManager,
+ ozoneManager.getMetadataManager(), differTmpDir, (status) -> {
+ LOG.debug("Snapshot defragmentation diff status: {}", status);
+ }, false, !isRocksToolsNativeLibAvailable());
+ this.lockIds = new ArrayList<>(1);
}
@Override
@@ -135,48 +196,318 @@ private boolean isRocksToolsNativeLibAvailable() {
}
/**
- * Checks if a snapshot needs defragmentation by examining its YAML metadata.
+ * Determines whether the specified snapshot requires defragmentation and
returns
+ * a pair indicating the need for defragmentation and the corresponding
version of the snapshot.
+ *
+ * @param snapshotInfo Information about the snapshot to be checked for
defragmentation.
+ * @return A pair containing a boolean value and an integer:
+ * - The boolean value indicates whether the snapshot requires
defragmentation
+ * (true if needed, false otherwise).
+ * - The integer represents the version of the snapshot being
evaluated.
+ * @throws IOException If an I/O error occurs while accessing the local
snapshot data or metadata.
*/
- private boolean needsDefragmentation(SnapshotInfo snapshotInfo) {
- if (!SstFilteringService.isSstFiltered(conf, snapshotInfo)) {
- return false;
- }
- try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider
readableOmSnapshotLocalDataProvider =
-
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager().getOmSnapshotLocalData(snapshotInfo))
{
- Path snapshotPath = OmSnapshotManager.getSnapshotPath(
- ozoneManager.getMetadataManager(), snapshotInfo,
-
readableOmSnapshotLocalDataProvider.getSnapshotLocalData().getVersion());
+ @VisibleForTesting
+ Pair<Boolean, Integer> needsDefragmentation(SnapshotInfo snapshotInfo)
throws IOException {
+ // Update snapshot local metadata to point to the correct previous
snapshotId if it was different and check if
+ // snapshot needs defrag.
+ try (WritableOmSnapshotLocalDataProvider
writableOmSnapshotLocalDataProvider =
+
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)) {
// Read snapshot local metadata from YAML
// Check if snapshot needs compaction (defragmentation)
- boolean needsDefrag = readableOmSnapshotLocalDataProvider.needsDefrag();
- LOG.debug("Snapshot {} needsDefragmentation field value: {}",
snapshotInfo.getName(), needsDefrag);
+ writableOmSnapshotLocalDataProvider.commit();
+ boolean needsDefrag = writableOmSnapshotLocalDataProvider.needsDefrag();
+ OmSnapshotLocalData localData =
writableOmSnapshotLocalDataProvider.getSnapshotLocalData();
+ if (!needsDefrag) {
+ OmSnapshotLocalData previousLocalData =
writableOmSnapshotLocalDataProvider.getPreviousSnapshotLocalData();
+ LOG.debug("Skipping defragmentation since snapshot has already been
defragmented: id : {}(version: {}=>{}) " +
+ "previousId: {}(version: {})", snapshotInfo.getSnapshotId(),
localData.getVersion(),
+
localData.getVersionSstFileInfos().get(localData.getVersion()).getPreviousSnapshotVersion(),
+ snapshotInfo.getPathPreviousSnapshotId(),
previousLocalData.getVersion());
+ } else {
+ LOG.debug("Snapshot {} needsDefragmentation field value: true",
snapshotInfo.getSnapshotId());
+ }
+ return Pair.of(needsDefrag, localData.getVersion());
+ }
+ }
- return needsDefrag;
- } catch (IOException e) {
- LOG.warn("Failed to read YAML metadata for snapshot {}, assuming defrag
needed",
- snapshotInfo.getName(), e);
- return true;
+ private Pair<String, String> getTableBounds(Table<String, ?> table) throws
RocksDatabaseException, CodecException {
+ String tableLowestValue = null, tableHighestValue = null;
+ try (TableIterator<String, String> keyIterator = table.keyIterator()) {
+ if (keyIterator.hasNext()) {
+ // Setting the lowest value to the first key in the table.
+ tableLowestValue = keyIterator.next();
+ }
+ keyIterator.seekToLast();
+ if (keyIterator.hasNext()) {
+ // Setting the highest value to the last key in the table.
+ tableHighestValue = keyIterator.next();
+ }
}
+ return Pair.of(tableLowestValue, tableHighestValue);
}
/**
- * Performs full defragmentation for the first snapshot in the chain.
- * This is a simplified implementation that demonstrates the concept.
+ * Performs a full defragmentation process for specified tables in the
metadata manager.
+ * This method processes all the entries in the tables for the provided
prefix information,
+ * deletes specified key ranges, and compacts the tables to remove
tombstones.
+ *
+ * @param checkpointDBStore the metadata manager responsible for managing
tables during the checkpoint process
+ * @param prefixInfo the prefix information used to identify bucket prefix
and determine key ranges in the tables
+ * @param incrementalTables the set of tables for which incremental
defragmentation is performed.
+ * @throws IOException if an I/O error occurs during table operations or
compaction
*/
- private void performFullDefragmentation(SnapshotInfo snapshotInfo,
- OmSnapshot omSnapshot) throws IOException {
+ @VisibleForTesting
+ void performFullDefragmentation(DBStore checkpointDBStore, TablePrefixInfo
prefixInfo,
+ Set<String> incrementalTables) throws IOException {
+ for (String table : incrementalTables) {
+ Table<String, byte[]> checkpointTable =
checkpointDBStore.getTable(table, StringCodec.get(),
+ ByteArrayCodec.get());
+ String tableBucketPrefix = prefixInfo.getTablePrefix(table);
+ String prefixUpperBound =
getLexicographicallyHigherString(tableBucketPrefix);
+
+ Pair<String, String> tableBounds = getTableBounds(checkpointTable);
+ String tableLowestValue = tableBounds.getLeft();
+ String tableHighestValue = tableBounds.getRight();
+
+ // If lowest value is not null and if the bucket prefix corresponding to
the table is greater than lower then
+ // delete the range between lowest value and bucket prefix.
+ if (tableLowestValue != null &&
tableLowestValue.compareTo(tableBucketPrefix) < 0) {
+ checkpointTable.deleteRange(tableLowestValue, tableBucketPrefix);
+ }
+ // If highest value is not null and if the next higher lexicographical
string of bucket prefix corresponding to
+ // the table is less than equal to the highest value then delete the
range between bucket prefix
+ // and also the highest value.
+ if (tableHighestValue != null &&
tableHighestValue.compareTo(prefixUpperBound) >= 0) {
+ checkpointTable.deleteRange(prefixUpperBound, tableHighestValue);
+ checkpointTable.delete(tableHighestValue);
+ }
+ // Compact the table completely with kForce to get rid of tombstones.
+ try (ManagedCompactRangeOptions compactRangeOptions = new
ManagedCompactRangeOptions()) {
+
compactRangeOptions.setBottommostLevelCompaction(ManagedCompactRangeOptions.BottommostLevelCompaction.kForce);
+ compactRangeOptions.setExclusiveManualCompaction(true);
+ checkpointDBStore.compactTable(table, compactRangeOptions);
+ }
+ }
+ }
- // TODO: Implement full defragmentation
+ /**
+ * Spills table difference into an SST file based on the provided delta file
paths,
+ * current snapshot table, previous snapshot table, and an optional table
key prefix.
+ *
+ * The method reads the delta files and compares the records against the
snapshot tables.
+ * Any differences, including tombstones (deleted entries), are written to a
new SST file.
+ *
+ * @param deltaFilePaths the list of paths to the delta files to process
+ * @param snapshotTable the current snapshot table for comparison
+ * @param previousSnapshotTable the previous snapshot table for comparison
+ * @param tableKeyPrefix the prefix for filtering certain keys, or null if
all keys are to be included
+ * @return a pair of the path of the created SST file containing the
differences and a boolean
+ * indicating whether any delta entries were written (true if there
are differences, false otherwise)
+ * @throws IOException if an I/O error occurs during processing
+ */
+ @VisibleForTesting
+ Pair<Path, Boolean> spillTableDiffIntoSstFile(List<Path> deltaFilePaths,
Table<String, byte[]> snapshotTable,
+ Table<String, byte[]> previousSnapshotTable, String tableKeyPrefix)
throws IOException {
+ String sstFileReaderUpperBound = null;
+ if (Strings.isNotEmpty(tableKeyPrefix)) {
+ sstFileReaderUpperBound =
getLexicographicallyHigherString(tableKeyPrefix);
+ }
+ SstFileSetReader sstFileSetReader = new SstFileSetReader(deltaFilePaths);
+ Path fileToBeIngested = differTmpDir.resolve(snapshotTable.getName() + "-"
+ UUID.randomUUID()
+ + SST_FILE_EXTENSION);
+ int deltaEntriesCount = 0;
+ try (ClosableIterator<String> keysToCheck =
+ sstFileSetReader.getKeyStreamWithTombstone(tableKeyPrefix,
sstFileReaderUpperBound);
+ TableMergeIterator<String, byte[]> tableMergeIterator = new
TableMergeIterator<>(keysToCheck,
+ tableKeyPrefix, snapshotTable, previousSnapshotTable);
+ RDBSstFileWriter rdbSstFileWriter = new
RDBSstFileWriter(fileToBeIngested.toFile())) {
+ while (tableMergeIterator.hasNext()) {
+ Table.KeyValue<String, List<byte[]>> kvs = tableMergeIterator.next();
+ // Check if the values are equal or if they are not equal then the
value should be written to the
+ // delta sstFile.
+ if (!Arrays.equals(kvs.getValue().get(0), kvs.getValue().get(1))) {
+ try (CodecBuffer key =
StringCodec.get().toHeapCodecBuffer(kvs.getKey())) {
+ byte[] keyArray = key.asReadOnlyByteBuffer().array();
+ byte[] val = kvs.getValue().get(0);
+ // If the value is null then add a tombstone to the delta sstFile.
+ if (val == null) {
+ rdbSstFileWriter.delete(keyArray);
+ } else {
+ rdbSstFileWriter.put(keyArray, val);
+ }
+ }
+ deltaEntriesCount++;
+ }
+ }
+ } catch (RocksDBException e) {
+ throw new RocksDatabaseException("Error while reading sst files.", e);
+ }
+ // If there are no delta entries then delete the delta file. No need to
ingest the file as a diff.
+ return Pair.of(fileToBeIngested, deltaEntriesCount != 0);
}
/**
- * Performs incremental defragmentation using diff from previous
defragmented snapshot.
+ * Performs an incremental defragmentation process, which involves
determining
+ * and processing delta files between snapshots for metadata updates. The
method
+ * computes the changes, manages file ingestion to the checkpoint metadata
manager,
+ * and ensures that all delta files are deleted after processing.
+ *
+ * @param previousSnapshotInfo information about the previous snapshot.
+ * @param snapshotInfo information about the current snapshot for which
+ * incremental defragmentation is performed.
+ * @param snapshotVersion the version of the snapshot to be processed.
+ * @param checkpointStore the dbStore instance where data
+ * updates are ingested after being processed.
+ * @param bucketPrefixInfo table prefix information associated with buckets,
+ * used to determine bounds for processing keys.
+ * @param incrementalTables the set of tables for which incremental
defragmentation is performed.
+ * @throws IOException if an I/O error occurs during processing.
*/
- private void performIncrementalDefragmentation(SnapshotInfo currentSnapshot,
- SnapshotInfo previousDefraggedSnapshot, OmSnapshot currentOmSnapshot)
+ @VisibleForTesting
+ void performIncrementalDefragmentation(SnapshotInfo previousSnapshotInfo,
SnapshotInfo snapshotInfo,
+ int snapshotVersion, DBStore checkpointStore, TablePrefixInfo
bucketPrefixInfo, Set<String> incrementalTables)
throws IOException {
+ // Map of delta files grouped on the basis of the tableName.
+ Collection<Pair<Path, SstFileInfo>> allTableDeltaFiles =
this.deltaDiffComputer.getDeltaFiles(
+ previousSnapshotInfo, snapshotInfo, incrementalTables);
+
+ Map<String, List<Path>> tableGroupedDeltaFiles =
allTableDeltaFiles.stream()
+ .collect(Collectors.groupingBy(pair ->
pair.getValue().getColumnFamily(),
+ Collectors.mapping(Pair::getKey, Collectors.toList())));
+
+ String volumeName = snapshotInfo.getVolumeName();
+ String bucketName = snapshotInfo.getBucketName();
+
+ Set<Path> filesToBeDeleted = new HashSet<>();
+ // All files computed as delta must be deleted irrespective of whether
ingestion succeeded or not.
+ allTableDeltaFiles.forEach(pair -> filesToBeDeleted.add(pair.getKey()));
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
+ omSnapshotManager.getActiveSnapshot(volumeName, bucketName,
snapshotInfo.getName());
+ UncheckedAutoCloseableSupplier<OmSnapshot> previousSnapshot =
+ omSnapshotManager.getActiveSnapshot(volumeName, bucketName,
previousSnapshotInfo.getName())) {
+ for (Map.Entry<String, List<Path>> entry :
tableGroupedDeltaFiles.entrySet()) {
+ String table = entry.getKey();
+ List<Path> deltaFiles = entry.getValue();
+ Path fileToBeIngested;
+ if (deltaFiles.size() == 1 && snapshotVersion > 0) {
+ // If there is only one delta file for the table and the snapshot
version is also not 0 then the same delta
+ // file can reingested into the checkpointStore.
+ fileToBeIngested = deltaFiles.get(0);
+ } else {
+ Table<String, byte[]> snapshotTable =
snapshot.get().getMetadataManager().getStore()
+ .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+ Table<String, byte[]> previousSnapshotTable =
previousSnapshot.get().getMetadataManager().getStore()
+ .getTable(table, StringCodec.get(), ByteArrayCodec.get());
+ String tableBucketPrefix = bucketPrefixInfo.getTablePrefix(table);
+ Pair<Path, Boolean> spillResult =
spillTableDiffIntoSstFile(deltaFiles, snapshotTable,
+ previousSnapshotTable, tableBucketPrefix);
+ fileToBeIngested = spillResult.getValue() ? spillResult.getLeft() :
null;
+ filesToBeDeleted.add(spillResult.getLeft());
+ }
+ if (fileToBeIngested != null) {
+ if (!fileToBeIngested.toFile().exists()) {
+ throw new IOException("Delta file does not exist: " +
fileToBeIngested);
+ }
+ Table checkpointTable = checkpointStore.getTable(table);
+ checkpointTable.loadFromFile(fileToBeIngested.toFile());
+ }
+ }
+ } finally {
+ for (Path path : filesToBeDeleted) {
+ if (path.toFile().exists()) {
+ if (!path.toFile().delete()) {
+ LOG.warn("Failed to delete file: {}", path);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Ingests non-incremental tables from a snapshot into a checkpoint database
store.
+ * This involves exporting table data from the snapshot to intermediate SST
files
+ * and ingesting them into the corresponding tables in the checkpoint
database store.
+ * Tables that are part of incremental defragmentation are excluded from
this process.
+ *
+ * @param checkpointDBStore the database store where non-incremental tables
are ingested.
+ * @param snapshotInfo the metadata information of the snapshot being
processed.
+ * @param bucketPrefixInfo prefix information used for determining table
prefixes.
+ * @param incrementalTables the set of tables identified for incremental
defragmentation.
+ * @throws IOException if an I/O error occurs during table ingestion or file
operations.
+ */
+ @VisibleForTesting
+ void ingestNonIncrementalTables(DBStore checkpointDBStore,
+ SnapshotInfo snapshotInfo, TablePrefixInfo bucketPrefixInfo, Set<String>
incrementalTables) throws IOException {
+ String volumeName = snapshotInfo.getVolumeName();
+ String bucketName = snapshotInfo.getBucketName();
+ String snapshotName = snapshotInfo.getName();
+ Set<Path> filesToBeDeleted = new HashSet<>();
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
omSnapshotManager.getActiveSnapshot(volumeName,
+ bucketName, snapshotName)) {
+ DBStore snapshotDBStore = snapshot.get().getMetadataManager().getStore();
+ for (Table snapshotTable : snapshotDBStore.listTables()) {
+ String snapshotTableName = snapshotTable.getName();
+ if (!incrementalTables.contains(snapshotTable.getName())) {
+ Path tmpSstFile = differTmpDir.resolve(snapshotTable.getName() + "-"
+ UUID.randomUUID()
+ + SST_FILE_EXTENSION);
+ filesToBeDeleted.add(tmpSstFile);
+ String prefix = bucketPrefixInfo.getTablePrefix(snapshotTableName);
+ byte[] prefixBytes = Strings.isBlank(prefix) ? null :
StringCodec.get().toPersistedFormat(prefix);
+ Table<byte[], byte[]> snapshotTableBytes =
snapshotDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
+ ByteArrayCodec.get());
+ snapshotTableBytes.dumpToFileWithPrefix(tmpSstFile.toFile(),
prefixBytes);
+ Table<byte[], byte[]> checkpointTable =
checkpointDBStore.getTable(snapshotTableName, ByteArrayCodec.get(),
+ ByteArrayCodec.get());
+ checkpointTable.loadFromFile(tmpSstFile.toFile());
+ }
+ }
+ } finally {
+ for (Path path : filesToBeDeleted) {
+ if (path.toFile().exists()) {
+ if (!path.toFile().delete()) {
+ LOG.warn("Failed to delete file for ingesting non incremental
table: {}", path);
+ }
+ }
+ }
+ }
+ }
- // TODO: Implement incremental defragmentation
+ /**
+ * Atomically switches the current snapshot database to a new version derived
+ * from the provided checkpoint directory. This involves moving the
checkpoint
+ * path to a versioned directory, updating the snapshot metadata, and
committing
+ * the changes to persist the snapshot version update.
+ *
+ * @param snapshotId The UUID identifying the snapshot to update.
+ * @param checkpointPath The path to the checkpoint directory that serves as
the basis
+ * for the updated snapshot version.
+ * @return The previous version number of the snapshot prior to the update.
+ * @throws IOException If an I/O error occurs during file operations,
checkpoint processing,
+ * or snapshot metadata updates.
+ */
+ @VisibleForTesting
+ int atomicSwitchSnapshotDB(UUID snapshotId, Path checkpointPath) throws
IOException {
+ try (WritableOmSnapshotLocalDataProvider snapshotLocalDataProvider =
+
snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotId)) {
+ OmSnapshotLocalData localData =
snapshotLocalDataProvider.getSnapshotLocalData();
+ Path nextVersionPath =
OmSnapshotManager.getSnapshotPath(ozoneManager.getMetadataManager(), snapshotId,
+ localData.getVersion() + 1);
+ // Remove the directory if it exists.
+ if (nextVersionPath.toFile().exists()) {
+ deleteDirectory(nextVersionPath);
+ }
+ // Move the checkpoint directory to the next version directory.
+ Files.move(checkpointPath, nextVersionPath);
+ RocksDBCheckpoint dbCheckpoint = new RocksDBCheckpoint(nextVersionPath);
+ // Add a new version to the local data file.
+ try (OmMetadataManagerImpl newVersionCheckpointMetadataManager =
+ OmMetadataManagerImpl.createCheckpointMetadataManager(conf,
dbCheckpoint, true)) {
+ RDBStore newVersionCheckpointStore = (RDBStore)
newVersionCheckpointMetadataManager.getStore();
+
snapshotLocalDataProvider.addSnapshotVersion(newVersionCheckpointStore);
+ snapshotLocalDataProvider.commit();
+ }
+ return localData.getVersion() - 1;
+ }
}
private final class SnapshotDefragTask implements BackgroundTask {
@@ -192,6 +523,105 @@ public BackgroundTaskResult call() throws Exception {
}
}
+ /**
+ * Creates a new checkpoint by modifying the metadata manager from a
snapshot.
+ * This involves generating a temporary checkpoint and truncating specified
+ * column families from the checkpoint before returning the updated metadata
manager.
+ *
+ * @param snapshotInfo Information about the snapshot for which the
checkpoint
+ * is being created.
+ * @param incrementalColumnFamilies A set of table names representing
incremental
+ * column families to be retained in the
checkpoint.
+ * @return A new instance of OmMetadataManagerImpl initialized with the
modified
+ * checkpoint.
+ * @throws IOException If an I/O error occurs during snapshot processing,
+ * checkpoint creation, or table operations.
+ */
+ @VisibleForTesting
+ OmMetadataManagerImpl createCheckpoint(SnapshotInfo snapshotInfo,
+ Set<String> incrementalColumnFamilies) throws IOException {
+ try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshot =
omSnapshotManager.getActiveSnapshot(
+ snapshotInfo.getVolumeName(), snapshotInfo.getBucketName(),
snapshotInfo.getName())) {
+ DBCheckpoint checkpoint =
snapshot.get().getMetadataManager().getStore().getCheckpoint(tmpDefragDir,
true);
+ try (OmMetadataManagerImpl metadataManagerBeforeTruncate =
+ OmMetadataManagerImpl.createCheckpointMetadataManager(conf,
checkpoint, false)) {
+ DBStore dbStore = metadataManagerBeforeTruncate.getStore();
+ for (String table : metadataManagerBeforeTruncate.listTableNames()) {
+ if (!incrementalColumnFamilies.contains(table)) {
+ dbStore.dropTable(table);
+ }
+ }
+ } catch (Exception e) {
+ throw new IOException("Failed to close checkpoint of snapshot: " +
snapshotInfo.getSnapshotId(), e);
+ }
+ // This will recreate the column families in the checkpoint.
+ return OmMetadataManagerImpl.createCheckpointMetadataManager(conf,
checkpoint, false);
+ }
+ }
+
+ private void acquireContentLock(UUID snapshotID) throws IOException {
+ lockIds.clear();
+ lockIds.add(snapshotID);
+ OMLockDetails lockDetails = snapshotContentLocks.acquireLock(lockIds);
+ if (!lockDetails.isLockAcquired()) {
+ throw new IOException("Failed to acquire lock on snapshot: " +
snapshotID);
+ }
+ LOG.debug("Acquired MultiSnapshotLocks on snapshot: {}", snapshotID);
+ }
+
+ private boolean checkAndDefragSnapshot(SnapshotChainManager chainManager,
UUID snapshotId) throws IOException {
+ SnapshotInfo snapshotInfo = SnapshotUtils.getSnapshotInfo(ozoneManager,
chainManager, snapshotId);
+
+ if (snapshotInfo.getSnapshotStatus() !=
SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE) {
+ LOG.debug("Skipping defragmentation for non-active snapshot: {} (ID:
{})",
+ snapshotInfo.getName(), snapshotInfo.getSnapshotId());
+ return false;
+ }
+ Pair<Boolean, Integer> needsDefragVersionPair =
needsDefragmentation(snapshotInfo);
+ if (!needsDefragVersionPair.getLeft()) {
+ return false;
+ }
+ // Create a checkpoint of the previous snapshot or the current snapshot if
it is the first snapshot in the chain.
+ SnapshotInfo checkpointSnapshotInfo =
snapshotInfo.getPathPreviousSnapshotId() == null ? snapshotInfo :
+ SnapshotUtils.getSnapshotInfo(ozoneManager, chainManager,
snapshotInfo.getPathPreviousSnapshotId());
+
+ OmMetadataManagerImpl checkpointMetadataManager =
createCheckpoint(checkpointSnapshotInfo,
+ COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ Path checkpointLocation =
checkpointMetadataManager.getStore().getDbLocation().toPath();
+ try {
+ DBStore checkpointDBStore = checkpointMetadataManager.getStore();
+ TablePrefixInfo prefixInfo =
ozoneManager.getMetadataManager().getTableBucketPrefix(snapshotInfo.getVolumeName(),
+ snapshotInfo.getBucketName());
+ // If first snapshot in the chain perform full defragmentation.
+ if (snapshotInfo.getPathPreviousSnapshotId() == null) {
+ performFullDefragmentation(checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ } else {
+ performIncrementalDefragmentation(checkpointSnapshotInfo,
snapshotInfo, needsDefragVersionPair.getValue(),
+ checkpointDBStore, prefixInfo,
COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ }
+ int previousVersion;
+ // Acquire Content lock on the snapshot to ensure the contents of the
table doesn't get changed.
+ acquireContentLock(snapshotId);
+ try {
+ // Ingestion of incremental tables KeyTable/FileTable/DirectoryTable
done now we need to just reingest the
+ // remaining tables from the original snapshot.
+ ingestNonIncrementalTables(checkpointDBStore, snapshotInfo,
prefixInfo, COLUMN_FAMILIES_TO_TRACK_IN_SNAPSHOT);
+ checkpointMetadataManager.close();
+ checkpointMetadataManager = null;
+ // Switch the snapshot DB location to the new version.
+ previousVersion = atomicSwitchSnapshotDB(snapshotId,
checkpointLocation);
+ } finally {
+ snapshotContentLocks.releaseLock();
+ }
+ omSnapshotManager.deleteSnapshotCheckpointDirectories(snapshotId,
previousVersion);
+ } finally {
+ if (checkpointMetadataManager != null) {
+ checkpointMetadataManager.close();
+ }
+ }
+ return true;
+ }
+
public synchronized boolean triggerSnapshotDefragOnce() throws IOException {
final long count = runCount.incrementAndGet();
@@ -217,119 +647,26 @@ public synchronized boolean triggerSnapshotDefragOnce()
throws IOException {
final SnapshotChainManager snapshotChainManager =
((OmMetadataManagerImpl)
ozoneManager.getMetadataManager()).getSnapshotChainManager();
- final Table<String, SnapshotInfo> snapshotInfoTable =
- ozoneManager.getMetadataManager().getSnapshotInfoTable();
-
// Use iterator(false) to iterate forward through the snapshot chain
Iterator<UUID> snapshotIterator = snapshotChainManager.iterator(false);
long snapshotLimit = snapshotLimitPerTask;
-
while (snapshotLimit > 0 && running.get() && snapshotIterator.hasNext()) {
- // Get SnapshotInfo for the current snapshot in the chain
+ // Get SnapshotInfo for the current snapshot in the chain.
UUID snapshotId = snapshotIterator.next();
- String snapshotTableKey = snapshotChainManager.getTableKey(snapshotId);
- SnapshotInfo snapshotToDefrag = snapshotInfoTable.get(snapshotTableKey);
- if (snapshotToDefrag == null) {
- LOG.warn("Snapshot with ID '{}' not found in snapshot info table",
snapshotId);
- continue;
- }
-
- // Skip deleted snapshots
- if (snapshotToDefrag.getSnapshotStatus() ==
SnapshotInfo.SnapshotStatus.SNAPSHOT_DELETED) {
- LOG.debug("Skipping deleted snapshot: {} (ID: {})",
- snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
- continue;
- }
-
- // Check if this snapshot needs defragmentation
- if (!needsDefragmentation(snapshotToDefrag)) {
- LOG.debug("Skipping already defragged snapshot: {} (ID: {})",
- snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
- continue;
- }
-
- LOG.info("Will defrag snapshot: {} (ID: {})",
- snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-
- // Acquire MultiSnapshotLocks
- if
(!snapshotIdLocks.acquireLock(Collections.singletonList(snapshotToDefrag.getSnapshotId()))
- .isLockAcquired()) {
- LOG.error("Abort. Failed to acquire lock on snapshot: {} (ID: {})",
- snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
- break;
- }
-
- try {
- LOG.info("Processing snapshot defragmentation for: {} (ID: {})",
- snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-
- // Get snapshot through SnapshotCache for proper locking
- try (UncheckedAutoCloseableSupplier<OmSnapshot> snapshotSupplier =
-
snapshotManager.get().getSnapshot(snapshotToDefrag.getSnapshotId())) {
-
- OmSnapshot omSnapshot = snapshotSupplier.get();
-
- UUID pathPreviousSnapshotId =
snapshotToDefrag.getPathPreviousSnapshotId();
- boolean isFirstSnapshotInPath = pathPreviousSnapshotId == null;
- if (isFirstSnapshotInPath) {
- LOG.info("Performing full defragmentation for first snapshot (in
path): {}",
- snapshotToDefrag.getName());
- performFullDefragmentation(snapshotToDefrag, omSnapshot);
- } else {
- final String psIdtableKey =
snapshotChainManager.getTableKey(pathPreviousSnapshotId);
- SnapshotInfo previousDefraggedSnapshot =
snapshotInfoTable.get(psIdtableKey);
-
- LOG.info("Performing incremental defragmentation for snapshot: {}
" +
- "based on previous defragmented snapshot: {}",
- snapshotToDefrag.getName(),
previousDefraggedSnapshot.getName());
-
- // If previous path snapshot is not null, it must have been
defragmented already
- // Sanity check to ensure previous snapshot exists and is
defragmented
- if (needsDefragmentation(previousDefraggedSnapshot)) {
- LOG.error("Fatal error before defragging snapshot: {}. " +
- "Previous snapshot in path {} was not defragged while it
is expected to be.",
- snapshotToDefrag.getName(),
previousDefraggedSnapshot.getName());
- break;
- }
-
- performIncrementalDefragmentation(snapshotToDefrag,
- previousDefraggedSnapshot, omSnapshot);
- }
-
- // TODO: Update snapshot metadata here?
-
- // Close and evict the original snapshot DB from SnapshotCache
- // TODO: Implement proper eviction from SnapshotCache
- LOG.info("Defragmentation completed for snapshot: {}",
- snapshotToDefrag.getName());
-
+ try (UncheckedAutoCloseable lock =
getBootstrapStateLock().acquireReadLock()) {
+ if (checkAndDefragSnapshot(snapshotChainManager, snapshotId)) {
snapshotLimit--;
snapshotsDefraggedCount.getAndIncrement();
-
- } catch (OMException ome) {
- if (ome.getResult() == OMException.ResultCodes.FILE_NOT_FOUND) {
- LOG.info("Snapshot {} was deleted during defragmentation",
- snapshotToDefrag.getName());
- } else {
- LOG.error("OMException during snapshot defragmentation for: {}",
- snapshotToDefrag.getName(), ome);
- }
}
-
- } catch (Exception e) {
- LOG.error("Exception during snapshot defragmentation for: {}",
- snapshotToDefrag.getName(), e);
+ } catch (IOException e) {
+ LOG.error("Exception while defragmenting snapshot: {}", snapshotId, e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.error("Snapshot defragmentation task interrupted", e);
return false;
- } finally {
- // Release lock MultiSnapshotLocks
- snapshotIdLocks.releaseLock();
- LOG.debug("Released MultiSnapshotLocks on snapshot: {} (ID: {})",
- snapshotToDefrag.getName(), snapshotToDefrag.getSnapshotId());
-
}
}
-
return true;
}
@@ -371,6 +708,18 @@ public BootstrapStateHandler.Lock getBootstrapStateLock() {
public void shutdown() {
running.set(false);
super.shutdown();
+ try {
+ deltaDiffComputer.close();
+ } catch (IOException e) {
+ LOG.error("Error while closing delta diff computer.", e);
+ }
+ Path tmpDirPath = Paths.get(tmpDefragDir);
+ if (tmpDirPath.toFile().exists()) {
+ try {
+ deleteDirectory(tmpDirPath);
+ } catch (IOException e) {
+ LOG.error("Failed to delete temporary directory: {}", tmpDirPath, e);
+ }
+ }
}
}
-
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
new file mode 100644
index 00000000000..46fa02d21f3
--- /dev/null
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/defrag/TestSnapshotDefragService.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.snapshot.defrag;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
+import
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.WritableOmSnapshotLocalDataProvider;
+import
org.apache.hadoop.ozone.om.snapshot.diff.delta.CompositeDeltaDiffComputer;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
+import org.apache.hadoop.ozone.upgrade.LayoutFeature;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mock;
+import org.mockito.MockedConstruction;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Unit tests for SnapshotDefragService.
+ */
+public class TestSnapshotDefragService {
+
+ @Mock
+ private OzoneManager ozoneManager;
+
+ @Mock
+ private OmSnapshotManager omSnapshotManager;
+
+ @Mock
+ private OmSnapshotLocalDataManager snapshotLocalDataManager;
+
+ @Mock
+ private OmMetadataManagerImpl metadataManager;
+
+ @Mock
+ private IOzoneManagerLock omLock;
+
+ @Mock
+ private OMLayoutVersionManager versionManager;
+
+ @TempDir
+ private Path tempDir;
+ private SnapshotDefragService defragService;
+ private AutoCloseable mocks;
+
+ @BeforeEach
+ public void setup() throws IOException {
+ mocks = MockitoAnnotations.openMocks(this);
+ OzoneConfiguration configuration = new OzoneConfiguration();
+
+ // Setup basic mocks
+ when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
+ when(ozoneManager.getMetadataManager()).thenReturn(metadataManager);
+ when(ozoneManager.getThreadNamePrefix()).thenReturn("TestOM");
+ when(ozoneManager.isRunning()).thenReturn(true);
+ when(ozoneManager.getVersionManager()).thenReturn(versionManager);
+
when(ozoneManager.getOmRatisServer()).thenReturn(mock(OzoneManagerRatisServer.class));
+
+
when(omSnapshotManager.getSnapshotLocalDataManager()).thenReturn(snapshotLocalDataManager);
+ when(metadataManager.getLock()).thenReturn(omLock);
+ when(metadataManager.getSnapshotParentDir()).thenReturn(tempDir);
+ when(versionManager.isAllowed(any(LayoutFeature.class))).thenReturn(true);
+ try (MockedConstruction<CompositeDeltaDiffComputer>
compositeDeltaDiffComputer =
+ mockConstruction(CompositeDeltaDiffComputer.class)) {
+ // Initialize service
+ defragService = new SnapshotDefragService(
+ 10000, // interval
+ TimeUnit.MILLISECONDS,
+ 60000, // timeout
+ ozoneManager,
+ configuration
+ );
+ assertEquals(1, compositeDeltaDiffComputer.constructed().size());
+ }
+
+ }
+
+ @AfterEach
+ public void tearDown() throws Exception {
+ if (defragService != null) {
+ defragService.shutdown();
+ }
+ if (mocks != null) {
+ mocks.close();
+ }
+ }
+
+ @Test
+ public void testServiceStartAndPause() {
+ defragService.start();
+ assertTrue(defragService.getSnapshotsDefraggedCount().get() >= 0);
+
+ defragService.pause();
+ assertNotNull(defragService);
+
+ defragService.resume();
+ assertNotNull(defragService);
+ }
+
+ @Test
+ public void testNeedsDefragmentationAlreadyDefragmented() throws IOException
{
+ UUID snapshotId = UUID.randomUUID();
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, "vol1",
"bucket1", "snap1");
+
+ WritableOmSnapshotLocalDataProvider provider =
mock(WritableOmSnapshotLocalDataProvider.class);
+ OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+ OmSnapshotLocalData previousLocalData = mock(OmSnapshotLocalData.class);
+
+
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)).thenReturn(provider);
+ when(provider.needsDefrag()).thenReturn(false);
+ when(provider.getSnapshotLocalData()).thenReturn(localData);
+
when(provider.getPreviousSnapshotLocalData()).thenReturn(previousLocalData);
+ when(localData.getVersion()).thenReturn(1);
+ when(previousLocalData.getVersion()).thenReturn(0);
+
+
+ OmSnapshotLocalData.VersionMeta versionInfo =
mock(OmSnapshotLocalData.VersionMeta.class);
+ when(versionInfo.getPreviousSnapshotVersion()).thenReturn(0);
+ Map<Integer, OmSnapshotLocalData.VersionMeta> versionMap =
ImmutableMap.of(1, versionInfo);
+ when(localData.getVersionSstFileInfos()).thenReturn(versionMap);
+
+ Pair<Boolean, Integer> result =
defragService.needsDefragmentation(snapshotInfo);
+
+ assertFalse(result.getLeft());
+ assertEquals(1, result.getRight());
+ verify(provider).commit();
+ verify(provider).close();
+ }
+
+ @Test
+ public void testNeedsDefragmentationRequiresDefrag() throws IOException {
+ UUID snapshotId = UUID.randomUUID();
+ SnapshotInfo snapshotInfo = createMockSnapshotInfo(snapshotId, "vol1",
"bucket1", "snap1");
+
+ WritableOmSnapshotLocalDataProvider provider =
mock(WritableOmSnapshotLocalDataProvider.class);
+ OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+ AtomicInteger commit = new AtomicInteger(0);
+
when(snapshotLocalDataManager.getWritableOmSnapshotLocalData(snapshotInfo)).thenReturn(provider);
+ when(provider.getSnapshotLocalData()).thenReturn(localData);
+ doAnswer(invocationOnMock -> {
+ commit.incrementAndGet();
+ return null;
+ }).when(provider).commit();
+ when(provider.needsDefrag()).thenAnswer(i -> commit.get() == 1);
+ int version = ThreadLocalRandom.current().nextInt(100);
+ when(localData.getVersion()).thenReturn(version);
+
+ Pair<Boolean, Integer> result =
defragService.needsDefragmentation(snapshotInfo);
+
+ assertTrue(result.getLeft());
+ assertEquals(version, result.getRight());
+ verify(provider).close();
+ }
+
+ /**
+ * Helper method to create a mock SnapshotInfo.
+ */
+ private SnapshotInfo createMockSnapshotInfo(UUID snapshotId, String volume,
+ String bucket, String name) {
+ SnapshotInfo.Builder builder = SnapshotInfo.newBuilder();
+ builder.setSnapshotId(snapshotId);
+ builder.setVolumeName(volume);
+ builder.setBucketName(bucket);
+ builder.setName(name);
+ builder.setSnapshotStatus(SnapshotInfo.SnapshotStatus.SNAPSHOT_ACTIVE);
+ builder.setCreationTime(System.currentTimeMillis());
+ return builder.build();
+ }
+}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
index 19579a59e16..6ff77765618 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/diff/delta/TestRDBDifferComputer.java
@@ -527,3 +527,5 @@ private OmSnapshotLocalData
createMockSnapshotLocalDataWithVersions(UUID snapsho
return localData;
}
}
+
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]