This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new dc2b5585818 HDDS-13867. SnapshotDiff delta file computation should
happen based on LocalDataYaml file (#9268)
dc2b5585818 is described below
commit dc2b55858180dd719205073a30db0e7cf84aa96f
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Mon Nov 17 21:37:39 2025 -0500
HDDS-13867. SnapshotDiff delta file computation should happen based on
LocalDataYaml file (#9268)
---
.../org/apache/ozone/rocksdb/util/RdbUtil.java | 18 +-
.../org/apache/ozone/rocksdb/util/SstFileInfo.java | 6 +
.../apache/ozone/rocksdiff/DifferSnapshotInfo.java | 72 ++--
.../ozone/rocksdiff/RocksDBCheckpointDiffer.java | 271 +++++++-------
.../org/apache/ozone/rocksdiff/RocksDiffUtils.java | 61 ++--
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 403 +++++++++++++--------
.../apache/ozone/rocksdiff/TestRocksDiffUtils.java | 98 +----
.../hadoop/ozone/freon/TestOMSnapshotDAG.java | 110 +++---
.../apache/hadoop/ozone/om/OmSnapshotManager.java | 2 +-
.../ozone/om/snapshot/SnapshotDiffManager.java | 139 ++++---
.../ozone/om/snapshot/TestSnapshotDiffManager.java | 129 ++++---
11 files changed, 711 insertions(+), 598 deletions(-)
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
index 95c4a4aa2bb..ac88102f800 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/RdbUtil.java
@@ -17,7 +17,6 @@
package org.apache.ozone.rocksdb.util;
-import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -41,28 +40,25 @@ public final class RdbUtil {
private RdbUtil() { }
- public static List<LiveFileMetaData> getLiveSSTFilesForCFs(
- final ManagedRocksDB rocksDB, Set<String> cfs) {
+ public static List<LiveFileMetaData> getLiveSSTFilesForCFs(final
ManagedRocksDB rocksDB, Set<String> cfs) {
return rocksDB.get().getLiveFilesMetaData().stream()
.filter(lfm ->
cfs.contains(StringUtils.bytes2String(lfm.columnFamilyName())))
.collect(Collectors.toList());
}
- public static Set<String> getSSTFilesForComparison(
- final ManagedRocksDB rocksDB, Set<String> cfs) {
- return getLiveSSTFilesForCFs(rocksDB, cfs).stream()
- .map(lfm -> new File(lfm.path(), lfm.fileName()).getPath())
+ public static Set<SstFileInfo> getSSTFilesForComparison(final ManagedRocksDB
rocksDB, Set<String> cfs) {
+ return getLiveSSTFilesForCFs(rocksDB, cfs).stream().map(SstFileInfo::new)
.collect(Collectors.toCollection(HashSet::new));
}
- public static Map<Object, String> getSSTFilesWithInodesForComparison(final
ManagedRocksDB rocksDB, Set<String> cfs)
- throws IOException {
+ public static Map<Object, SstFileInfo> getSSTFilesWithInodesForComparison(
+ final ManagedRocksDB rocksDB, Set<String> cfs) throws IOException {
List<LiveFileMetaData> liveSSTFilesForCFs = getLiveSSTFilesForCFs(rocksDB,
cfs);
- Map<Object, String> inodeToSstMap = new HashMap<>();
+ Map<Object, SstFileInfo> inodeToSstMap = new HashMap<>();
for (LiveFileMetaData lfm : liveSSTFilesForCFs) {
Path sstFilePath = Paths.get(lfm.path(), lfm.fileName());
Object inode = Files.readAttributes(sstFilePath,
BasicFileAttributes.class).fileKey();
- inodeToSstMap.put(inode, sstFilePath.toString());
+ inodeToSstMap.put(inode, new SstFileInfo(lfm));
}
return inodeToSstMap;
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
index 50f8c4c54d0..bc4975f9868 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdb/util/SstFileInfo.java
@@ -18,7 +18,9 @@
package org.apache.ozone.rocksdb.util;
import static org.apache.commons.io.FilenameUtils.getBaseName;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
+import java.nio.file.Path;
import java.util.Objects;
import org.apache.hadoop.hdds.StringUtils;
import org.apache.hadoop.hdds.utils.db.CopyObject;
@@ -89,6 +91,10 @@ public int hashCode() {
return Objects.hash(fileName, startKey, endKey, columnFamily);
}
+ public Path getFilePath(Path directoryPath) {
+ return directoryPath.resolve(fileName + SST_FILE_EXTENSION);
+ }
+
@Override
public SstFileInfo copyObject() {
return new SstFileInfo(fileName, startKey, endKey, columnFamily);
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
index c72f56d5f11..840ed37a246 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/DifferSnapshotInfo.java
@@ -17,56 +17,68 @@
package org.apache.ozone.rocksdiff;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.NavigableMap;
+import java.util.Set;
import java.util.UUID;
-import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
/**
* Snapshot information node class for the differ.
*/
public class DifferSnapshotInfo {
- private final String dbPath;
- private final UUID snapshotId;
- private final long snapshotGeneration;
+ private final UUID id;
+ private final long generation;
+ private final Function<Integer, Path> dbPathFunction;
+ private final NavigableMap<Integer, List<SstFileInfo>> versionSstFiles;
- private final TablePrefixInfo tablePrefixes;
+ public DifferSnapshotInfo(Function<Integer, Path> dbPathFunction, UUID id,
long gen,
+ NavigableMap<Integer, List<SstFileInfo>> sstFiles)
{
+ this.dbPathFunction = dbPathFunction;
+ this.id = id;
+ generation = gen;
+ this.versionSstFiles = sstFiles;
+ }
- private final ManagedRocksDB rocksDB;
+ public Path getDbPath(int version) {
+ return dbPathFunction.apply(version);
+ }
- public DifferSnapshotInfo(String db, UUID id, long gen,
- TablePrefixInfo tablePrefixInfo,
- ManagedRocksDB rocksDB) {
- dbPath = db;
- snapshotId = id;
- snapshotGeneration = gen;
- tablePrefixes = tablePrefixInfo;
- this.rocksDB = rocksDB;
+ public UUID getId() {
+ return id;
}
- public String getDbPath() {
- return dbPath;
+ public long getGeneration() {
+ return generation;
}
- public UUID getSnapshotId() {
- return snapshotId;
+ List<SstFileInfo> getSstFiles(int version, Set<String> tablesToLookup) {
+ return versionSstFiles.get(version).stream()
+ .filter(sstFileInfo ->
tablesToLookup.contains(sstFileInfo.getColumnFamily()))
+ .collect(Collectors.toList());
}
- public long getSnapshotGeneration() {
- return snapshotGeneration;
+ @VisibleForTesting
+ SstFileInfo getSstFile(int version, String fileName) {
+ return versionSstFiles.get(version).stream()
+ .filter(sstFileInfo -> sstFileInfo.getFileName().equals(fileName))
+ .findFirst().orElse(null);
}
- public TablePrefixInfo getTablePrefixes() {
- return tablePrefixes;
+ Integer getMaxVersion() {
+ return versionSstFiles.lastKey();
}
@Override
public String toString() {
- return String.format("DifferSnapshotInfo{dbPath='%s', snapshotID='%s', " +
- "snapshotGeneration=%d, tablePrefixes size=%s}",
- dbPath, snapshotId, snapshotGeneration, tablePrefixes.size());
- }
-
- public ManagedRocksDB getRocksDB() {
- return rocksDB;
+ return String.format("DifferSnapshotInfo{dbPath='%s', id='%s',
generation=%d}",
+ versionSstFiles.keySet().stream().collect(toMap(identity(),
dbPathFunction::apply)), id, generation);
}
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
index 9de00948da7..2a3ae61fa56 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDBCheckpointDiffer.java
@@ -18,6 +18,7 @@
package org.apache.ozone.rocksdiff;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.function.Function.identity;
import static org.apache.commons.lang3.ArrayUtils.EMPTY_BYTE_ARRAY;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
@@ -32,7 +33,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
import com.google.common.graph.MutableGraph;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.BufferedWriter;
@@ -73,6 +73,7 @@
import org.apache.hadoop.hdds.utils.NativeLibraryNotLoadedException;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.RocksDatabaseException;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedEnvOptions;
import org.apache.hadoop.hdds.utils.db.managed.ManagedOptions;
@@ -84,7 +85,7 @@
import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
-import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
import org.apache.ratis.util.UncheckedAutoCloseable;
import org.rocksdb.AbstractEventListener;
import org.rocksdb.ColumnFamilyHandle;
@@ -153,7 +154,7 @@ public class RocksDBCheckpointDiffer implements
AutoCloseable,
* Used to trim the file extension when writing compaction entries to the log
* to save space.
*/
- static final String SST_FILE_EXTENSION = ".sst";
+ public static final String SST_FILE_EXTENSION = ".sst";
public static final int SST_FILE_EXTENSION_LENGTH =
SST_FILE_EXTENSION.length();
static final String PRUNED_SST_FILE_TEMP = "pruned.sst.tmp";
@@ -597,57 +598,6 @@ private void createLink(Path link, Path source) {
}
}
- /**
- * Helper method to trim the filename retrieved from LiveFileMetaData.
- */
- private String trimSSTFilename(String filename) {
- if (!filename.startsWith("/")) {
- final String errorMsg = String.format(
- "Invalid start of filename: '%s'. Expected '/'", filename);
- LOG.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
- if (!filename.endsWith(SST_FILE_EXTENSION)) {
- final String errorMsg = String.format(
- "Invalid extension of file: '%s'. Expected '%s'",
- filename, SST_FILE_EXTENSION_LENGTH);
- LOG.error(errorMsg);
- throw new RuntimeException(errorMsg);
- }
- return filename.substring("/".length(),
- filename.length() - SST_FILE_EXTENSION_LENGTH);
- }
-
- /**
- * Read the current Live manifest for a given RocksDB instance (Active or
- * Checkpoint).
- * @param rocksDB open rocksDB instance.
- * @param tableFilter set of column-family/table names to include when
collecting live SSTs.
- * @return a list of SST files (without extension) in the DB.
- */
- public Set<String> readRocksDBLiveFiles(ManagedRocksDB rocksDB, Set<String>
tableFilter) {
- HashSet<String> liveFiles = new HashSet<>();
-
- final Set<String> cfs = Sets.newHashSet(
- org.apache.hadoop.hdds.StringUtils.bytes2String(
- RocksDB.DEFAULT_COLUMN_FAMILY), "keyTable", "directoryTable",
- "fileTable");
- // Note it retrieves only the selected column families by the descriptor
- // i.e. keyTable, directoryTable, fileTable
- List<LiveFileMetaData> liveFileMetaDataList =
- RdbUtil.getLiveSSTFilesForCFs(rocksDB, cfs);
- LOG.debug("SST File Metadata for DB: " + rocksDB.get().getName());
- for (LiveFileMetaData m : liveFileMetaDataList) {
- if
(!tableFilter.contains(StringUtils.bytes2String(m.columnFamilyName()))) {
- continue;
- }
- LOG.debug("File: {}, Level: {}", m.fileName(), m.level());
- final String trimmedFilename = trimSSTFilename(m.fileName());
- liveFiles.add(trimmedFilename);
- }
- return liveFiles;
- }
-
/**
* Process log line of compaction log text file input and populate the DAG.
* It also adds the compaction log entry to compaction log table.
@@ -791,12 +741,10 @@ private void preconditionChecksForLoadAllCompactionLogs()
{
* exist in backup directory before being involved in compactions),
* and appends the extension '.sst'.
*/
- private String getSSTFullPath(String sstFilenameWithoutExtension,
- String... dbPaths) {
+ private String getSSTFullPath(String sstFilenameWithoutExtension, Path...
dbPaths) {
// Try to locate the SST in the backup dir first
- final Path sstPathInBackupDir = Paths.get(sstBackupDir,
- sstFilenameWithoutExtension + SST_FILE_EXTENSION);
+ final Path sstPathInBackupDir = Paths.get(sstBackupDir,
sstFilenameWithoutExtension + SST_FILE_EXTENSION);
if (Files.exists(sstPathInBackupDir)) {
return sstPathInBackupDir.toString();
}
@@ -804,17 +752,15 @@ private String getSSTFullPath(String
sstFilenameWithoutExtension,
// SST file does not exist in the SST backup dir, this means the SST file
// has not gone through any compactions yet and is only available in the
// src DB directory or destDB directory
- for (String dbPath : dbPaths) {
- final Path sstPathInDBDir = Paths.get(dbPath,
- sstFilenameWithoutExtension + SST_FILE_EXTENSION);
+ for (Path dbPath : dbPaths) {
+ final Path sstPathInDBDir = dbPath.resolve(sstFilenameWithoutExtension +
SST_FILE_EXTENSION);
if (Files.exists(sstPathInDBDir)) {
return sstPathInDBDir.toString();
}
}
// TODO: More graceful error handling?
- throw new RuntimeException("Unable to locate SST file: " +
- sstFilenameWithoutExtension);
+ throw new RuntimeException("Unable to locate SST file: " +
sstFilenameWithoutExtension);
}
/**
@@ -824,6 +770,7 @@ private String getSSTFullPath(String
sstFilenameWithoutExtension,
*
* @param src source snapshot
* @param dest destination snapshot
+ * @param versionMap version map containing the connection between source
snapshot version and dest snapshot version.
* @param tablesToLookup tablesToLookup set of table (column family) names
used to restrict which SST files to return.
* @param sstFilesDirForSnapDiffJob dir to create hardlinks for SST files
* for snapDiff job.
@@ -832,22 +779,31 @@ private String getSSTFullPath(String
sstFilenameWithoutExtension,
* "/path/to/sstBackupDir/000060.sst"]
*/
public synchronized Optional<List<String>>
getSSTDiffListWithFullPath(DifferSnapshotInfo src,
- DifferSnapshotInfo dest, Set<String> tablesToLookup,
- String sstFilesDirForSnapDiffJob) {
+ DifferSnapshotInfo dest, Map<Integer, Integer> versionMap,
TablePrefixInfo prefixInfo,
+ Set<String> tablesToLookup, String sstFilesDirForSnapDiffJob) throws
IOException {
+ int srcVersion = src.getMaxVersion();
+ if (!versionMap.containsKey(srcVersion)) {
+ throw new IOException("No corresponding dest version corresponding
srcVersion : " + srcVersion + " in " +
+ "versionMap : " + versionMap);
+ }
+ int destVersion = versionMap.get(srcVersion);
+ DifferSnapshotVersion srcSnapshotVersion = new DifferSnapshotVersion(src,
src.getMaxVersion(), tablesToLookup);
+ DifferSnapshotVersion destSnapshotVersion = new
DifferSnapshotVersion(dest, destVersion, tablesToLookup);
- Optional<List<String>> sstDiffList = getSSTDiffList(src, dest,
tablesToLookup);
+ // If the source snapshot version is 0, use the compaction DAG path
otherwise performs a full diff on the basis
+ // of the sst file names.
+ Optional<List<SstFileInfo>> sstDiffList =
getSSTDiffList(srcSnapshotVersion, destSnapshotVersion, prefixInfo,
+ tablesToLookup, srcVersion == 0);
return sstDiffList.map(diffList -> diffList.stream()
- .map(
- sst -> {
- String sstFullPath = getSSTFullPath(sst, src.getDbPath(),
dest.getDbPath());
- Path link = Paths.get(sstFilesDirForSnapDiffJob,
- sst + SST_FILE_EXTENSION);
- Path srcFile = Paths.get(sstFullPath);
- createLink(link, srcFile);
- return link.toString();
- })
- .collect(Collectors.toList()));
+ .map(sst -> {
+ String sstFullPath = getSSTFullPath(sst.getFileName(),
srcSnapshotVersion.getDbPath(),
+ destSnapshotVersion.getDbPath());
+ Path link = sst.getFilePath(Paths.get(sstFilesDirForSnapDiffJob));
+ Path srcFile = Paths.get(sstFullPath);
+ createLink(link, srcFile);
+ return link.toString();
+ }).collect(Collectors.toList()));
}
/**
@@ -859,57 +815,116 @@ public synchronized Optional<List<String>>
getSSTDiffListWithFullPath(DifferSnap
*
* @param src source snapshot
* @param dest destination snapshot
+ * @param prefixInfo TablePrefixInfo to filter irrelevant SST files; can be
null.
* @param tablesToLookup tablesToLookup Set of column-family (table) names
to include when reading SST files;
* must be non-null.
+ * @param useCompactionDag If true, the method uses the compaction history
to produce the incremental diff,
+ * otherwise a full diff would be performed on the
basis of the sst file names.
* @return A list of SST files without extension. e.g. ["000050", "000060"]
*/
- public synchronized Optional<List<String>> getSSTDiffList(DifferSnapshotInfo
src,
- DifferSnapshotInfo dest, Set<String> tablesToLookup) {
+ public synchronized Optional<List<SstFileInfo>>
getSSTDiffList(DifferSnapshotVersion src,
+ DifferSnapshotVersion dest, TablePrefixInfo prefixInfo, Set<String>
tablesToLookup, boolean useCompactionDag) {
// TODO: Reject or swap if dest is taken after src, once snapshot chain
// integration is done.
- Set<String> srcSnapFiles = readRocksDBLiveFiles(src.getRocksDB(),
tablesToLookup);
- Set<String> destSnapFiles = readRocksDBLiveFiles(dest.getRocksDB(),
tablesToLookup);
-
- Set<String> fwdDAGSameFiles = new HashSet<>();
- Set<String> fwdDAGDifferentFiles = new HashSet<>();
-
- LOG.debug("Doing forward diff from src '{}' to dest '{}'",
- src.getDbPath(), dest.getDbPath());
- internalGetSSTDiffList(src, dest, srcSnapFiles, destSnapFiles,
- fwdDAGSameFiles, fwdDAGDifferentFiles);
+ Map<String, SstFileInfo> fwdDAGSameFiles = new HashMap<>();
+ Map<String, SstFileInfo> fwdDAGDifferentFiles = new HashMap<>();
+ if (useCompactionDag) {
+ LOG.debug("Doing forward diff from src '{}' to dest '{}'",
src.getDbPath(), dest.getDbPath());
+ internalGetSSTDiffList(src, dest, fwdDAGSameFiles, fwdDAGDifferentFiles);
+ } else {
+ Set<SstFileInfo> srcSstFileInfos = new
HashSet<>(src.getSstFileMap().values());
+ Set<SstFileInfo> destSstFileInfos = new
HashSet<>(dest.getSstFileMap().values());
+ for (SstFileInfo srcSstFileInfo : srcSstFileInfos) {
+ if (destSstFileInfos.contains(srcSstFileInfo)) {
+ fwdDAGSameFiles.put(srcSstFileInfo.getFileName(), srcSstFileInfo);
+ } else {
+ fwdDAGDifferentFiles.put(srcSstFileInfo.getFileName(),
srcSstFileInfo);
+ }
+ }
+ for (SstFileInfo destSstFileInfo : destSstFileInfos) {
+ if (srcSstFileInfos.contains(destSstFileInfo)) {
+ fwdDAGSameFiles.put(destSstFileInfo.getFileName(), destSstFileInfo);
+ } else {
+ fwdDAGDifferentFiles.put(destSstFileInfo.getFileName(),
destSstFileInfo);
+ }
+ }
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Result of diff from src '" + src.getDbPath() + "' to dest '" +
dest.getDbPath() + "':");
StringBuilder logSB = new StringBuilder();
logSB.append("Fwd DAG same SST files: ");
- for (String file : fwdDAGSameFiles) {
+ for (String file : fwdDAGSameFiles.keySet()) {
logSB.append(file).append(SPACE_DELIMITER);
}
LOG.debug(logSB.toString());
logSB.setLength(0);
logSB.append("Fwd DAG different SST files: ");
- for (String file : fwdDAGDifferentFiles) {
+ for (String file : fwdDAGDifferentFiles.keySet()) {
logSB.append(file).append(SPACE_DELIMITER);
}
LOG.debug("{}", logSB);
}
// Check if the DAG traversal was able to reach all the destination SST
files.
- for (String destSnapFile : destSnapFiles) {
- if (!fwdDAGSameFiles.contains(destSnapFile) &&
!fwdDAGDifferentFiles.contains(destSnapFile)) {
+ for (String destSnapFile : dest.getSstFiles()) {
+ if (!fwdDAGSameFiles.containsKey(destSnapFile) &&
!fwdDAGDifferentFiles.containsKey(destSnapFile)) {
return Optional.empty();
}
}
- if (src.getTablePrefixes() != null && src.getTablePrefixes().size() != 0) {
- RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles,
src.getTablePrefixes(),
- compactionDag.getCompactionMap(), tablesToLookup, src.getRocksDB(),
dest.getRocksDB());
+ if (prefixInfo != null && prefixInfo.size() != 0) {
+ RocksDiffUtils.filterRelevantSstFiles(fwdDAGDifferentFiles,
tablesToLookup, prefixInfo);
+ }
+ return Optional.of(new ArrayList<>(fwdDAGDifferentFiles.values()));
+ }
+
+ /**
+ * This class represents a version of a snapshot in a database differ
operation.
+ * It contains metadata associated with a specific snapshot version,
including
+ * SST file information, generation id, and the database path for the given
version.
+ *
+ * Designed to work with `DifferSnapshotInfo`, this class allows the
retrieval of
+ * snapshot-related metadata and facilitates mapping of SST files for
version comparison
+ * and other operations.
+ *
+ * The core functionality is to store and provide read-only access to:
+ * - SST file information for a specified snapshot version.
+ * - Snapshot generation identifier.
+ * - Path to the database directory corresponding to the snapshot version.
+ */
+ public static class DifferSnapshotVersion {
+ private Map<String, SstFileInfo> sstFiles;
+ private long generation;
+ private Path dbPath;
+
+ public DifferSnapshotVersion(DifferSnapshotInfo differSnapshotInfo, int
version,
+ Set<String> tablesToLookup) {
+ this.sstFiles = differSnapshotInfo.getSstFiles(version, tablesToLookup)
+ .stream().collect(Collectors.toMap(SstFileInfo::getFileName,
identity()));
+ this.generation = differSnapshotInfo.getGeneration();
+ this.dbPath = differSnapshotInfo.getDbPath(version);
+ }
+
+ private Path getDbPath() {
+ return dbPath;
+ }
+
+ private long getGeneration() {
+ return generation;
+ }
+
+ private Set<String> getSstFiles() {
+ return sstFiles.keySet();
+ }
+
+ private Map<String, SstFileInfo> getSstFileMap() {
+ return Collections.unmodifiableMap(sstFiles);
}
- return Optional.of(new ArrayList<>(fwdDAGDifferentFiles));
}
/**
@@ -921,30 +936,26 @@ public synchronized Optional<List<String>>
getSSTDiffList(DifferSnapshotInfo src
* diffing). Otherwise, add it to the differentFiles map, as it will
* need further diffing.
*/
- synchronized void internalGetSSTDiffList(
- DifferSnapshotInfo src,
- DifferSnapshotInfo dest,
- Set<String> srcSnapFiles,
- Set<String> destSnapFiles,
- Set<String> sameFiles,
- Set<String> differentFiles) {
+ synchronized void internalGetSSTDiffList(DifferSnapshotVersion src,
DifferSnapshotVersion dest,
+ Map<String, SstFileInfo> sameFiles, Map<String, SstFileInfo>
differentFiles) {
Preconditions.checkArgument(sameFiles.isEmpty(), "Set must be empty");
Preconditions.checkArgument(differentFiles.isEmpty(), "Set must be empty");
-
- for (String fileName : srcSnapFiles) {
- if (destSnapFiles.contains(fileName)) {
+ Map<String, SstFileInfo> destSnapFiles = dest.getSstFileMap();
+ for (Map.Entry<String, SstFileInfo> sstFileEntry :
src.getSstFileMap().entrySet()) {
+ String fileName = sstFileEntry.getKey();
+ SstFileInfo sstFileInfo = sstFileEntry.getValue();
+ if (destSnapFiles.containsKey(fileName)) {
LOG.debug("Source '{}' and destination '{}' share the same SST '{}'",
src.getDbPath(), dest.getDbPath(), fileName);
- sameFiles.add(fileName);
+ sameFiles.put(fileName, sstFileInfo);
continue;
}
CompactionNode infileNode = compactionDag.getCompactionNode(fileName);
if (infileNode == null) {
- LOG.debug("Source '{}' SST file '{}' is never compacted",
- src.getDbPath(), fileName);
- differentFiles.add(fileName);
+ LOG.debug("Source '{}' SST file '{}' is never compacted",
src.getDbPath(), fileName);
+ differentFiles.put(fileName, sstFileInfo);
continue;
}
@@ -954,15 +965,12 @@ synchronized void internalGetSSTDiffList(
// Traversal level/depth indicator for debug print
int level = 1;
while (!currentLevel.isEmpty()) {
- LOG.debug("Traversal level: {}. Current level has {} nodes.",
- level++, currentLevel.size());
+ LOG.debug("Traversal level: {}. Current level has {} nodes.", level++,
currentLevel.size());
if (level >= 1000000) {
- final String errorMsg = String.format(
- "Graph traversal level exceeded allowed maximum (%d). "
- + "This could be due to invalid input generating a "
- + "loop in the traversal path. Same SSTs found so far: %s, "
- + "different SSTs: %s", level, sameFiles, differentFiles);
+ final String errorMsg = String.format("Graph traversal level
exceeded allowed maximum (%d). " +
+ "This could be due to invalid input generating a loop in the
traversal path. Same SSTs found so " +
+ "far: %s, different SSTs: %s", level, sameFiles, differentFiles);
LOG.error(errorMsg);
// Clear output in case of error. Expect fall back to full diff
sameFiles.clear();
@@ -974,43 +982,42 @@ synchronized void internalGetSSTDiffList(
final Set<CompactionNode> nextLevel = new HashSet<>();
for (CompactionNode current : currentLevel) {
LOG.debug("Processing node: '{}'", current.getFileName());
- if (current.getSnapshotGeneration() < dest.getSnapshotGeneration()) {
+ if (current.getSnapshotGeneration() < dest.getGeneration()) {
LOG.debug("Current node's snapshot generation '{}' "
+ "reached destination snapshot's '{}'. "
+ "Src '{}' and dest '{}' have different SST file: '{}'",
- current.getSnapshotGeneration(), dest.getSnapshotGeneration(),
+ current.getSnapshotGeneration(), dest.getGeneration(),
src.getDbPath(), dest.getDbPath(), current.getFileName());
- differentFiles.add(current.getFileName());
+ differentFiles.put(current.getFileName(), current);
continue;
}
Set<CompactionNode> successors =
compactionDag.getForwardCompactionDAG().successors(current);
if (successors.isEmpty()) {
- LOG.debug("No further compaction happened to the current file. " +
- "Src '{}' and dest '{}' have different file: {}",
- src.getDbPath(), dest.getDbPath(), current.getFileName());
- differentFiles.add(current.getFileName());
+ LOG.debug("No further compaction happened to the current file. Src
'{}' and dest '{}' " +
+ "have different file: {}", src.getDbPath(),
dest.getDbPath(), current.getFileName());
+ differentFiles.put(current.getFileName(), current);
continue;
}
for (CompactionNode nextNode : successors) {
- if (sameFiles.contains(nextNode.getFileName()) ||
- differentFiles.contains(nextNode.getFileName())) {
+ if (sameFiles.containsKey(nextNode.getFileName()) ||
+ differentFiles.containsKey(nextNode.getFileName())) {
LOG.debug("Skipping known processed SST: {}",
nextNode.getFileName());
continue;
}
- if (destSnapFiles.contains(nextNode.getFileName())) {
- LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
- src.getDbPath(), dest.getDbPath(), nextNode.getFileName());
- sameFiles.add(nextNode.getFileName());
+ if (destSnapFiles.containsKey(nextNode.getFileName())) {
+ LOG.debug("Src '{}' and dest '{}' have the same SST: {}",
src.getDbPath(), dest.getDbPath(),
+ nextNode.getFileName());
+ sameFiles.put(nextNode.getFileName(),
destSnapFiles.get(nextNode.getFileName()));
continue;
}
// Queue different SST to the next level
- LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
- src.getDbPath(), dest.getDbPath(), nextNode.getFileName());
+ LOG.debug("Src '{}' and dest '{}' have a different SST: {}",
src.getDbPath(), dest.getDbPath(),
+ nextNode.getFileName());
nextLevel.add(nextNode);
}
}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
index 7d9512768bc..c85c02f4c3b 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/main/java/org/apache/ozone/rocksdiff/RocksDiffUtils.java
@@ -20,17 +20,11 @@
import static org.apache.hadoop.hdds.StringUtils.getFirstNChars;
import com.google.common.annotations.VisibleForTesting;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import org.apache.commons.io.FilenameUtils;
import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
-import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.rocksdb.util.SstFileInfo;
-import org.rocksdb.LiveFileMetaData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,40 +48,41 @@ public static boolean isKeyWithPrefixPresent(String
prefixForColumnFamily,
&& prefixForColumnFamily.compareTo(endKeyPrefix) <= 0;
}
- public static void filterRelevantSstFiles(Set<String> inputFiles,
- TablePrefixInfo tablePrefixInfo,
- Set<String> columnFamiliesToLookup,
- ManagedRocksDB... dbs) {
- filterRelevantSstFiles(inputFiles, tablePrefixInfo,
Collections.emptyMap(), columnFamiliesToLookup, dbs);
+ /**
+ * Filter sst files based on prefixes. The map of sst files to be filtered
would be mutated.
+ * @param <T> Type of the key in the map.
+ * @param filesMapToBeFiltered Map of sst files to be filtered.
+ * @param tablesToLookup Set of column families to be included in the diff.
+ * @param tablePrefixInfo TablePrefixInfo to filter irrelevant SST files.
+ */
+ public static <T> Map<T, SstFileInfo> filterRelevantSstFiles(Map<T,
SstFileInfo> filesMapToBeFiltered,
+ Set<String> tablesToLookup, TablePrefixInfo tablePrefixInfo) {
+ for (Iterator<Map.Entry<T, SstFileInfo>> fileIterator =
filesMapToBeFiltered.entrySet().iterator();
+ fileIterator.hasNext();) {
+ SstFileInfo sstFileInfo = fileIterator.next().getValue();
+ if (shouldSkipNode(sstFileInfo, tablePrefixInfo, tablesToLookup)) {
+ fileIterator.remove();
+ }
+ }
+ return filesMapToBeFiltered;
}
/**
- * Filter sst files based on prefixes.
+ * Filter sst files based on prefixes. The set of sst files to be filtered
would be mutated.
+ * @param <T> Type of the key in the map.
+ * @param filesToBeFiltered sst files to be filtered.
+ * @param tablesToLookup Set of column families to be included in the diff.
+ * @param tablePrefixInfo TablePrefixInfo to filter irrelevant SST files.
*/
- public static void filterRelevantSstFiles(Set<String> inputFiles,
- TablePrefixInfo tablePrefixInfo,
- Map<String, CompactionNode>
preExistingCompactionNodes,
- Set<String> columnFamiliesToLookup,
- ManagedRocksDB... dbs) {
- Map<String, LiveFileMetaData> liveFileMetaDataMap = new HashMap<>();
- int dbIdx = 0;
- for (Iterator<String> fileIterator =
- inputFiles.iterator(); fileIterator.hasNext();) {
- String filename = FilenameUtils.getBaseName(fileIterator.next());
- while (!preExistingCompactionNodes.containsKey(filename) &&
!liveFileMetaDataMap.containsKey(filename)
- && dbIdx < dbs.length) {
- liveFileMetaDataMap.putAll(dbs[dbIdx].getLiveMetadataForSSTFiles());
- dbIdx += 1;
- }
- CompactionNode compactionNode = preExistingCompactionNodes.get(filename);
- if (compactionNode == null) {
- compactionNode = new CompactionNode(new
CompactionFileInfo.Builder(filename)
- .setValues(liveFileMetaDataMap.get(filename)).build());
- }
- if (shouldSkipNode(compactionNode, tablePrefixInfo,
columnFamiliesToLookup)) {
+ public static <T> Set<SstFileInfo> filterRelevantSstFiles(Set<SstFileInfo>
filesToBeFiltered,
+ Set<String> tablesToLookup, TablePrefixInfo tablePrefixInfo) {
+ for (Iterator<SstFileInfo> fileIterator = filesToBeFiltered.iterator();
fileIterator.hasNext();) {
+ SstFileInfo sstFileInfo = fileIterator.next();
+ if (shouldSkipNode(sstFileInfo, tablePrefixInfo, tablesToLookup)) {
fileIterator.remove();
}
}
+ return filesToBeFiltered;
}
@VisibleForTesting
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index d24790dfcfc..8af38c5454b 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -41,11 +41,14 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.graph.MutableGraph;
@@ -67,6 +70,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
@@ -76,10 +80,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
-import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.StringUtils;
@@ -104,7 +106,8 @@
import org.apache.hadoop.util.Time;
import org.apache.ozone.compaction.log.CompactionFileInfo;
import org.apache.ozone.compaction.log.CompactionLogEntry;
-import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
+import
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotVersion;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.NodeComparator;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.util.UncheckedAutoCloseable;
@@ -112,6 +115,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -131,6 +135,8 @@
* Test RocksDBCheckpointDiffer basic functionality.
*/
public class TestRocksDBCheckpointDiffer {
+ @TempDir
+ private static File dbDir;
private static final Logger LOG =
LoggerFactory.getLogger(TestRocksDBCheckpointDiffer.class);
@@ -279,8 +285,6 @@ public class TestRocksDBCheckpointDiffer {
private final List<File> cpDirList = new ArrayList<>();
- private final List<List<ColumnFamilyHandle>> colHandles = new ArrayList<>();
-
private static final String ACTIVE_DB_DIR_NAME = "./rocksdb-data";
private static final String METADATA_DIR_NAME = "./metadata";
private static final String COMPACTION_LOG_DIR_NAME = "compaction-log";
@@ -408,7 +412,7 @@ public void cleanUp() {
}
}
- private static List<CompactionLogEntry> getPrunedCompactionEntries(boolean
prune, Map<String, String[]> metadata) {
+ private static List<CompactionLogEntry> getPrunedCompactionEntries(boolean
prune, Map<String, SstFileInfo> metadata) {
List<CompactionLogEntry> entries = new ArrayList<>();
if (!prune) {
entries.add(createCompactionEntry(1,
@@ -431,6 +435,122 @@ private static List<CompactionLogEntry>
getPrunedCompactionEntries(boolean prune
return entries;
}
+ private static DifferSnapshotInfo mockDifferSnapshotVersion(String dbPath,
long generation) {
+ DifferSnapshotInfo differSnapshotInfo = mock(DifferSnapshotInfo.class);
+ when(differSnapshotInfo.getDbPath(anyInt())).thenReturn(Paths.get(dbPath));
+ when(differSnapshotInfo.getGeneration()).thenReturn(generation);
+ return differSnapshotInfo;
+ }
+
+ private static Stream<Arguments> getSSTDiffListWithoutCompactionDAGCase() {
+ return Stream.of(
+ Arguments.of("Delta File with same source and target",
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ad", "ag", "cf1")),
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ad", "ag", "cf1")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"), ImmutableSet.of("cf1"),
Collections.emptyList()),
+ Arguments.of("Delta File with source having more files",
+ ImmutableList.of(
+ new SstFileInfo("2", "ad", "ag", "cf1"),
+ new SstFileInfo("3", "af", "ah", "cf1")),
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ad", "ag", "cf1"),
+ new SstFileInfo("3", "af", "ah", "cf1")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"),
+ ImmutableSet.of("cf1"),
+ ImmutableList.of(new SstFileInfo("1", "ac", "ae", "cf1"))),
+ Arguments.of("Delta File with target having more files",
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ad", "ag", "cf1"),
+ new SstFileInfo("3", "af", "ah", "cf1")),
+ ImmutableList.of(
+ new SstFileInfo("2", "ad", "ag", "cf1"),
+ new SstFileInfo("3", "af", "ah", "cf1")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"),
+ ImmutableSet.of("cf1"),
+ ImmutableList.of(new SstFileInfo("1", "ac", "ae", "cf1"))),
+ Arguments.of("Delta File computation with source files with invalid
prefix",
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "bh", "bi", "cf1")),
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("4", "af", "ai", "cf1")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"),
+ ImmutableSet.of("cf1"),
+ ImmutableList.of(new SstFileInfo("4", "af", "ai", "cf1"))),
+ Arguments.of("Delta File computation with target files with invalid
prefix",
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ah", "ai", "cf1")),
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("4", "bf", "bi", "cf1")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"),
+ ImmutableSet.of("cf1"),
+ ImmutableList.of(new SstFileInfo("2", "ah", "ai", "cf1"))),
+ Arguments.of("Delta File computation with target files with multiple
tables",
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ah", "ai", "cf1"),
+ new SstFileInfo("3", "ah", "ai", "cf3")),
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ah", "ai", "cf1"),
+ new SstFileInfo("5", "af", "ai", "cf4")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"), ImmutableSet.of("cf1"),
Collections.emptyList()),
+ Arguments.of("Delta File computation with target files with multiple
tables to lookup on source",
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ah", "ai", "cf1"),
+ new SstFileInfo("3", "ah", "ai", "cf3")),
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ah", "ai", "cf1"),
+ new SstFileInfo("5", "af", "ai", "cf4")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"),
+ ImmutableSet.of("cf1", "cf3"),
+ ImmutableList.of(new SstFileInfo("3", "ah", "ai", "cf3"))),
+ Arguments.of("Delta File computation with target files with multiple
tables to lookup on target",
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ah", "ai", "cf1"),
+ new SstFileInfo("3", "ah", "ai", "cf3")),
+ ImmutableList.of(
+ new SstFileInfo("1", "ac", "ae", "cf1"),
+ new SstFileInfo("2", "ah", "ai", "cf1"),
+ new SstFileInfo("5", "af", "ai", "cf4")),
+ ImmutableMap.of("cf1", "a", "cf2", "z"),
+ ImmutableSet.of("cf1", "cf4"),
+ ImmutableList.of(new SstFileInfo("5", "af", "ai", "cf4")))
+ );
+ }
+
+ private DifferSnapshotInfo getDifferSnapshotInfoForVersion(List<SstFileInfo>
sstFiles, int version) {
+ TreeMap<Integer, List<SstFileInfo>> sourceSstFileMap = new TreeMap<>();
+ sourceSstFileMap.put(version, sstFiles);
+ return new DifferSnapshotInfo(v -> Paths.get("src"), UUID.randomUUID(), 0,
sourceSstFileMap);
+ }
+
+ @ParameterizedTest
+ @MethodSource("getSSTDiffListWithoutCompactionDAGCase")
+ public void testGetSSTDiffListWithoutCompactionDag(String description,
List<SstFileInfo> sourceSstFiles,
+ List<SstFileInfo> destSstFiles, Map<String, String> prefixMap,
Set<String> tablesToLookup,
+ List<SstFileInfo> expectedDiffList) {
+ DifferSnapshotInfo sourceDSI =
getDifferSnapshotInfoForVersion(sourceSstFiles, 0);
+ DifferSnapshotVersion sourceVersion = new DifferSnapshotVersion(sourceDSI,
0, tablesToLookup);
+ DifferSnapshotInfo destDSI = getDifferSnapshotInfoForVersion(destSstFiles,
1);
+ DifferSnapshotVersion destVersion = new DifferSnapshotVersion(destDSI, 1,
tablesToLookup);
+ List<SstFileInfo> diffList =
rocksDBCheckpointDiffer.getSSTDiffList(sourceVersion, destVersion,
+ new TablePrefixInfo(prefixMap), tablesToLookup, false).orElse(null);
+ assertEquals(expectedDiffList, diffList);
+ }
+
/**
* Test cases for testGetSSTDiffListWithoutDB.
*/
@@ -504,21 +624,15 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
Arrays.asList("000105", "000095", "000088"),
Collections.singletonList("000107"))
);
-
- DifferSnapshotInfo snapshotInfo1 = new DifferSnapshotInfo(
- "/path/to/dbcp1", UUID.randomUUID(), 3008L, null,
Mockito.mock(ManagedRocksDB.class));
- DifferSnapshotInfo snapshotInfo2 = new DifferSnapshotInfo(
- "/path/to/dbcp2", UUID.randomUUID(), 14980L, null,
Mockito.mock(ManagedRocksDB.class));
- DifferSnapshotInfo snapshotInfo3 = new DifferSnapshotInfo(
- "/path/to/dbcp3", UUID.randomUUID(), 17975L, null,
Mockito.mock(ManagedRocksDB.class));
- DifferSnapshotInfo snapshotInfo4 = new DifferSnapshotInfo(
- "/path/to/dbcp4", UUID.randomUUID(), 18000L, null,
Mockito.mock(ManagedRocksDB.class));
+ Path baseDir =
dbDir.toPath().resolve("path").resolve("to").toAbsolutePath();
+ DifferSnapshotInfo snapshotInfo1 =
mockDifferSnapshotVersion(baseDir.resolve("dbcp1").toString(), 3008L);
+ DifferSnapshotInfo snapshotInfo2 =
mockDifferSnapshotVersion(baseDir.resolve("dbcp2").toString(), 14980L);
+ DifferSnapshotInfo snapshotInfo3 =
mockDifferSnapshotVersion(baseDir.resolve("dbcp3").toString(), 17975L);
+ DifferSnapshotInfo snapshotInfo4 =
mockDifferSnapshotVersion(baseDir.resolve("dbcp4").toString(), 18000L);
TablePrefixInfo prefixMap = new TablePrefixInfo(ImmutableMap.of("col1",
"c", "col2", "d"));
- DifferSnapshotInfo snapshotInfo5 = new DifferSnapshotInfo(
- "/path/to/dbcp2", UUID.randomUUID(), 0L, prefixMap,
Mockito.mock(ManagedRocksDB.class));
- DifferSnapshotInfo snapshotInfo6 = new DifferSnapshotInfo(
- "/path/to/dbcp2", UUID.randomUUID(), 100L, prefixMap,
Mockito.mock(ManagedRocksDB.class));
+ DifferSnapshotInfo snapshotInfo5 =
mockDifferSnapshotVersion(baseDir.resolve("dbcp2").toString(), 0L);
+ DifferSnapshotInfo snapshotInfo6 =
mockDifferSnapshotVersion(baseDir.resolve("dbcp2").toString(), 100L);
Set<String> snapshotSstFiles1 = ImmutableSet.of("000059", "000053");
Set<String> snapshotSstFiles2 = ImmutableSet.of("000088", "000059",
@@ -550,7 +664,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
"000095"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 2: Compaction log file crafted input: " +
"One source ('to' snapshot) SST file is never compacted " +
"(newly flushed)",
@@ -563,7 +677,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
ImmutableSet.of("000108"),
ImmutableSet.of("000108"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 3: Compaction log file crafted input: " +
"Same SST files found during SST expansion",
compactionLog,
@@ -575,7 +689,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("000066", "000059", "000053"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 4: Compaction log file crafted input: " +
"Skipping known processed SST.",
compactionLog,
@@ -587,7 +701,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
- true, Collections.emptyMap()),
+ true, Collections.emptyMap(), null),
Arguments.of("Test 5: Compaction log file hit snapshot" +
" generation early exit condition",
compactionLog,
@@ -599,7 +713,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 6: Compaction log table regular case. " +
"Expands expandable SSTs in the initial diff.",
null,
@@ -613,7 +727,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
"000095"),
ImmutableSet.of("000066", "000105", "000080", "000087", "000073",
"000095"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 7: Compaction log table crafted input: " +
"One source ('to' snapshot) SST file is never compacted " +
"(newly flushed)",
@@ -626,7 +740,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("000088", "000105", "000059", "000053", "000095"),
ImmutableSet.of("000108"),
ImmutableSet.of("000108"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 8: Compaction log table crafted input: " +
"Same SST files found during SST expansion",
null,
@@ -638,7 +752,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("000066", "000059", "000053"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
ImmutableSet.of("000080", "000087", "000073", "000095"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 9: Compaction log table crafted input: " +
"Skipping known processed SST.",
null,
@@ -650,7 +764,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
Collections.emptySet(),
Collections.emptySet(),
Collections.emptySet(),
- true, Collections.emptyMap()),
+ true, Collections.emptyMap(), null),
Arguments.of("Test 10: Compaction log table hit snapshot " +
"generation early exit condition",
null,
@@ -662,7 +776,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("000059", "000053"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
ImmutableSet.of("000066", "000080", "000087", "000073", "000062"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), null),
Arguments.of("Test 11: Older Compaction log got pruned and source
snapshot delta files would be " +
"unreachable",
null,
@@ -674,7 +788,7 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("1", "3", "13", "14"),
ImmutableSet.of("2", "8", "9", "12"),
ImmutableSet.of("2", "8", "9", "12"),
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), prefixMap),
Arguments.of("Test 12: Older Compaction log got pruned and source
snapshot delta files would be " +
"unreachable",
null,
@@ -686,22 +800,22 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("3", "13", "14"),
ImmutableSet.of("4", "5", "8", "9", "12"),
null,
- false, Collections.emptyMap()),
+ false, Collections.emptyMap(), prefixMap),
Arguments.of("Test 13: Compaction log to test filtering logic based on
range and column family",
null,
getPrunedCompactionEntries(false,
- new HashMap<String, String[]>() {{
- put("1", new String[]{"a", "c", "col1"});
- put("3", new String[]{"a", "d", "col2"});
- put("13", new String[]{"a", "c", "col13"});
- put("14", new String[]{"a", "c", "col1"});
- put("2", new String[]{"a", "c", "col1"});
- put("4", new String[]{"a", "b", "col1"});
- put("5", new String[]{"b", "b", "col1"});
- put("10", new String[]{"a", "b", "col1"});
- put("8", new String[]{"a", "b", "col1"});
- put("6", new String[]{"a", "z", "col13"});
- put("7", new String[]{"a", "z", "col13"});
+ new HashMap<String, SstFileInfo>() {{
+ put("1", new SstFileInfo("1", "a", "c", "col1"));
+ put("3", new SstFileInfo("3", "a", "d", "col2"));
+ put("13", new SstFileInfo("13", "a", "c", "col13"));
+ put("14", new SstFileInfo("14", "a", "c", "col1"));
+ put("2", new SstFileInfo("2", "a", "c", "col1"));
+ put("4", new SstFileInfo("4", "a", "b", "col1"));
+ put("5", new SstFileInfo("5", "b", "b", "col1"));
+ put("10", new SstFileInfo("10", "a", "b", "col1"));
+ put("8", new SstFileInfo("8", "a", "b", "col1"));
+ put("6", new SstFileInfo("6", "a", "z", "col13"));
+ put("7", new SstFileInfo("7", "a", "z", "col13"));
}}),
snapshotInfo6,
snapshotInfo5,
@@ -712,12 +826,12 @@ private static Stream<Arguments>
casesGetSSTDiffListWithoutDB() {
ImmutableSet.of("2", "9", "12"),
false,
ImmutableMap.of(
- "2", new String[]{"a", "b", "col1"},
- "12", new String[]{"a", "d", "col2"},
- "8", new String[]{"a", "b", "col1"},
- "9", new String[]{"a", "c", "col1"},
- "15", new String[]{"a", "z", "col13"}
- ))
+ "2", new SstFileInfo("2", "a", "b", "col1"),
+ "12", new SstFileInfo("12", "a", "d", "col2"),
+ "8", new SstFileInfo("8", "a", "b", "col1"),
+ "9", new SstFileInfo("9", "a", "c", "col1"),
+ "15", new SstFileInfo("15", "a", "z", "col13")
+ ), prefixMap)
);
}
@@ -740,7 +854,8 @@ public void testGetSSTDiffListWithoutDB(String description,
Set<String> expectedDiffSstFiles,
Set<String> expectedSSTDiffFiles,
boolean expectingException,
- Map<String, String[]> metaDataMap) {
+ Map<String, SstFileInfo> metaDataMap,
+ TablePrefixInfo prefixInfo) {
boolean exceptionThrown = false;
if (compactionLog != null) {
@@ -756,15 +871,33 @@ public void testGetSSTDiffListWithoutDB(String
description,
}
rocksDBCheckpointDiffer.loadAllCompactionLogs();
- Set<String> actualSameSstFiles = new HashSet<>();
- Set<String> actualDiffSstFiles = new HashSet<>();
+ Set<String> tablesToLookup;
+ String dummyTable;
+ if (prefixInfo != null) {
+ tablesToLookup = prefixInfo.getTableNames();
+ dummyTable = tablesToLookup.stream().findAny().get();
+ } else {
+ tablesToLookup = mock(Set.class);
+ when(tablesToLookup.contains(anyString())).thenReturn(true);
+ dummyTable = "dummy";
+ }
+ Map<String, SstFileInfo> actualSameSstFiles = new HashMap<>();
+ Map<String, SstFileInfo> actualDiffSstFiles = new HashMap<>();
+ List<SstFileInfo> sourceSnapshotFiles = srcSnapshotSstFiles.stream()
+ .map(fileName -> new SstFileInfo(fileName, "", "", dummyTable))
+ .collect(Collectors.toList());
+ List<SstFileInfo> destSnapshotFiles = destSnapshotSstFiles.stream()
+ .map(fileName -> new SstFileInfo(fileName, "", "", dummyTable))
+ .collect(Collectors.toList());
+ when(srcSnapshot.getSstFiles(eq(0),
eq(tablesToLookup))).thenReturn(sourceSnapshotFiles);
+ when(destSnapshot.getSstFiles(eq(0),
eq(tablesToLookup))).thenReturn(destSnapshotFiles);
+ DifferSnapshotVersion srcVersion = new DifferSnapshotVersion(srcSnapshot,
0, tablesToLookup);
+ DifferSnapshotVersion destVersion = new
DifferSnapshotVersion(destSnapshot, 0, tablesToLookup);
try {
rocksDBCheckpointDiffer.internalGetSSTDiffList(
- srcSnapshot,
- destSnapshot,
- srcSnapshotSstFiles,
- destSnapshotSstFiles,
+ srcVersion,
+ destVersion,
actualSameSstFiles,
actualDiffSstFiles);
} catch (RuntimeException rtEx) {
@@ -780,57 +913,30 @@ public void testGetSSTDiffListWithoutDB(String
description,
}
// Check same and different SST files result
- assertEquals(expectedSameSstFiles, actualSameSstFiles);
- assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
- try (MockedStatic<RdbUtil> mockedHandler =
Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS)) {
- RocksDB rocksDB = Mockito.mock(RocksDB.class);
- Mockito.when(rocksDB.getName()).thenReturn("dummy");
- Mockito.when(srcSnapshot.getRocksDB().get()).thenReturn(rocksDB);
- Mockito.when(destSnapshot.getRocksDB().get()).thenReturn(rocksDB);
- Mockito.when(srcSnapshot.getRocksDB().getLiveMetadataForSSTFiles())
- .thenAnswer(invocation ->
srcSnapshotSstFiles.stream().filter(metaDataMap::containsKey).map(file -> {
- LiveFileMetaData liveFileMetaData =
Mockito.mock(LiveFileMetaData.class);
- String[] metaData = metaDataMap.get(file);
- Mockito.when(liveFileMetaData.fileName()).thenReturn("/" + file +
SST_FILE_EXTENSION);
-
Mockito.when(liveFileMetaData.smallestKey()).thenReturn(metaData[0].getBytes(UTF_8));
-
Mockito.when(liveFileMetaData.largestKey()).thenReturn(metaData[1].getBytes(UTF_8));
-
Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(metaData[2].getBytes(UTF_8));
- return liveFileMetaData;
- }).collect(Collectors.toMap(liveFileMetaData ->
FilenameUtils.getBaseName(liveFileMetaData.fileName()),
- Function.identity())));
- Set<String> tablesToLookup;
- String dummyTable;
- if (srcSnapshot.getTablePrefixes() != null) {
- tablesToLookup = srcSnapshot.getTablePrefixes().getTableNames();
- dummyTable = tablesToLookup.stream().findAny().get();
+ assertEquals(expectedSameSstFiles, actualSameSstFiles.keySet());
+ assertEquals(expectedDiffSstFiles, actualDiffSstFiles.keySet());
+ when(srcSnapshot.getSstFiles(eq(0), eq(tablesToLookup)))
+ .thenAnswer(invocation -> srcSnapshotSstFiles.stream()
+ .map(file -> metaDataMap.getOrDefault(file, new SstFileInfo(file,
null, null, null)))
+ .collect(Collectors.toList()));
+ when(destSnapshot.getSstFiles(eq(0), eq(tablesToLookup)))
+ .thenAnswer(invocation -> destSnapshotSstFiles.stream()
+ .map(file -> metaDataMap.getOrDefault(file, new SstFileInfo(file,
null, null, null)))
+ .collect(Collectors.toList()));
+
+ try {
+ Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
+ .map(files ->
files.stream().sorted().collect(Collectors.toList())).orElse(null),
+ rocksDBCheckpointDiffer.getSSTDiffList(
+ new DifferSnapshotVersion(srcSnapshot, 0, tablesToLookup),
+ new DifferSnapshotVersion(destSnapshot, 0, tablesToLookup),
prefixInfo, tablesToLookup,
+ true)
+ .map(i ->
i.stream().map(SstFileInfo::getFileName).sorted().collect(Collectors.toList())).orElse(null));
+ } catch (RuntimeException rtEx) {
+ if (!expectingException) {
+ fail("Unexpected exception thrown in test.");
} else {
- tablesToLookup = mock(Set.class);
- when(tablesToLookup.contains(anyString())).thenReturn(true);
- dummyTable = "dummy";
- }
- mockedHandler.when(() -> RdbUtil.getLiveSSTFilesForCFs(any(), any()))
- .thenAnswer(i -> {
- Set<String> sstFiles =
i.getArgument(0).equals(srcSnapshot.getRocksDB()) ? srcSnapshotSstFiles
- : destSnapshotSstFiles;
- return sstFiles.stream().map(fileName -> {
- LiveFileMetaData liveFileMetaData = mock(LiveFileMetaData.class);
- when(liveFileMetaData.fileName()).thenReturn("/" + fileName +
SST_FILE_EXTENSION);
-
when(liveFileMetaData.columnFamilyName()).thenReturn(dummyTable.getBytes(UTF_8));
- return liveFileMetaData;
- }).collect(Collectors.toList());
- });
- try {
- Assertions.assertEquals(Optional.ofNullable(expectedSSTDiffFiles)
- .map(files ->
files.stream().sorted().collect(Collectors.toList())).orElse(null),
- rocksDBCheckpointDiffer.getSSTDiffList(srcSnapshot, destSnapshot,
tablesToLookup)
- .map(i ->
i.stream().sorted().collect(Collectors.toList())).orElse(null));
- } catch (RuntimeException rtEx) {
- if (!expectingException) {
- rtEx.printStackTrace();
- fail("Unexpected exception thrown in test.");
- } else {
- exceptionThrown = true;
- }
+ exceptionThrown = true;
}
}
if (expectingException && !exceptionThrown) {
@@ -878,19 +984,6 @@ void testDifferWithDB() throws Exception {
if (LOG.isDebugEnabled()) {
rocksDBCheckpointDiffer.dumpCompactionNodeTable();
}
-
- cleanUpSnapshots();
- }
-
- public void cleanUpSnapshots() {
- for (DifferSnapshotInfo snap : snapshots) {
- snap.getRocksDB().close();
- }
- for (List<ColumnFamilyHandle> colHandle : colHandles) {
- for (ColumnFamilyHandle handle : colHandle) {
- handle.close();
- }
- }
}
private static List<ColumnFamilyDescriptor> getColumnFamilyDescriptors() {
@@ -941,18 +1034,21 @@ void diffAllSnapshots(RocksDBCheckpointDiffer differ)
if
(rocksDBCheckpointDiffer.getCompactionNodeMap().containsKey(diffFile)) {
columnFamily =
rocksDBCheckpointDiffer.getCompactionNodeMap().get(diffFile).getColumnFamily();
} else {
- columnFamily =
bytes2String(src.getRocksDB().getLiveMetadataForSSTFiles().get(diffFile).columnFamilyName());
+ columnFamily = src.getSstFile(0, diffFile).getColumnFamily();
}
if (columnFamily == null || tableToLookUp.contains(columnFamily)) {
expectedDiffFiles.add(diffFile);
}
}
- List<String> sstDiffList = differ.getSSTDiffList(src, snap,
tableToLookUp).orElse(Collections.emptyList());
+ DifferSnapshotVersion srcSnapVersion = new DifferSnapshotVersion(src,
0, tableToLookUp);
+ DifferSnapshotVersion destSnapVersion = new
DifferSnapshotVersion(snap, 0, tableToLookUp);
+ List<SstFileInfo> sstDiffList = differ.getSSTDiffList(srcSnapVersion,
destSnapVersion, null,
+ tableToLookUp, true).orElse(Collections.emptyList());
LOG.info("SST diff list from '{}' to '{}': {} tables: {}",
- src.getDbPath(), snap.getDbPath(), sstDiffList, tableToLookUp);
-
- assertEquals(expectedDiffFiles, sstDiffList);
+ src.getDbPath(0), snap.getDbPath(0), sstDiffList, tableToLookUp);
+ assertEquals(expectedDiffFiles,
sstDiffList.stream().map(SstFileInfo::getFileName)
+ .collect(Collectors.toList()));
}
++index;
@@ -980,12 +1076,14 @@ private void createCheckpoint(ManagedRocksDB rocksDB)
throws RocksDBException {
createCheckPoint(ACTIVE_DB_DIR_NAME, cpPath, rocksDB);
final UUID snapshotId = UUID.randomUUID();
List<ColumnFamilyHandle> colHandle = new ArrayList<>();
- colHandles.add(colHandle);
- final DifferSnapshotInfo currentSnapshot =
- new DifferSnapshotInfo(cpPath, snapshotId, snapshotGeneration, null,
- ManagedRocksDB.openReadOnly(cpPath, getColumnFamilyDescriptors(),
- colHandle));
- this.snapshots.add(currentSnapshot);
+ try (ManagedRocksDB rdb = ManagedRocksDB.openReadOnly(cpPath,
getColumnFamilyDescriptors(), colHandle)) {
+ TreeMap<Integer, List<SstFileInfo>> versionSstFilesMap = new TreeMap<>();
+ versionSstFilesMap.put(0,
rdb.getLiveMetadataForSSTFiles().values().stream().map(SstFileInfo::new)
+ .collect(Collectors.toList()));
+ final DifferSnapshotInfo currentSnapshot = new
DifferSnapshotInfo((version) -> Paths.get(cpPath),
+ snapshotId, snapshotGeneration, versionSstFilesMap);
+ this.snapshots.add(currentSnapshot);
+ }
long t2 = Time.monotonicNow();
LOG.trace("Current time: " + t2);
@@ -1347,18 +1445,18 @@ private static CompactionLogEntry
createCompactionEntry(long dbSequenceNumber,
long compactionTime,
List<String>
inputFiles,
List<String>
outputFiles,
- Map<String,
String[]> metadata) {
+ Map<String,
SstFileInfo> metadata) {
return new CompactionLogEntry.Builder(dbSequenceNumber, compactionTime,
toFileInfoList(inputFiles, metadata), toFileInfoList(outputFiles,
metadata)).build();
}
private static List<CompactionFileInfo> toFileInfoList(List<String> files,
- Map<String, String[]>
metadata) {
+ Map<String,
SstFileInfo> metadata) {
return files.stream()
.map(fileName -> new CompactionFileInfo.Builder(fileName)
-
.setStartRange(Optional.ofNullable(metadata.get(fileName)).map(meta ->
meta[0]).orElse(null))
- .setEndRange(Optional.ofNullable(metadata.get(fileName)).map(meta
-> meta[1]).orElse(null))
-
.setColumnFamily(Optional.ofNullable(metadata.get(fileName)).map(meta ->
meta[2]).orElse(null))
+
.setStartRange(Optional.ofNullable(metadata.get(fileName)).map(SstFileInfo::getStartKey).orElse(null))
+
.setEndRange(Optional.ofNullable(metadata.get(fileName)).map(SstFileInfo::getEndKey).orElse(null))
+
.setColumnFamily(Optional.ofNullable(metadata.get(fileName)).map(SstFileInfo::getColumnFamily).orElse(null))
.build())
.collect(Collectors.toList());
}
@@ -1621,25 +1719,36 @@ public void testGetSSTDiffListWithoutDB2(
// Snapshot is used for logging purpose and short-circuiting traversal.
// Using gen 0 for this test.
+ List<SstFileInfo> srcSnapshotSstFileInfoSet = srcSnapshotSstFiles.stream()
+ .map(fileName -> new SstFileInfo(fileName, "", "",
"cf1")).collect(Collectors.toList());
+ List<SstFileInfo> destSnapshotSstFileInfoSet =
destSnapshotSstFiles.stream()
+ .map(fileName -> new SstFileInfo(fileName, "", "",
"cf1")).collect(Collectors.toList());
+ TreeMap<Integer, List<SstFileInfo>> srcSnapshotSstFileInfoMap = new
TreeMap<>();
+ srcSnapshotSstFileInfoMap.put(0, srcSnapshotSstFileInfoSet);
+ TreeMap<Integer, List<SstFileInfo>> destSnapshotSstFileInfoMap = new
TreeMap<>();
+ destSnapshotSstFileInfoMap.put(0, destSnapshotSstFileInfoSet);
+ Path path1 =
dbDir.toPath().resolve("path").resolve("to").resolve("dbcp1").toAbsolutePath();
+ Path path2 =
dbDir.toPath().resolve("path").resolve("to").resolve("dbcp2").toAbsolutePath();
DifferSnapshotInfo mockedSourceSnapshot = new DifferSnapshotInfo(
- "/path/to/dbcp1", UUID.randomUUID(), 0L, columnFamilyPrefixInfo, null);
+ (version) -> path1, UUID.randomUUID(), 0L, srcSnapshotSstFileInfoMap);
DifferSnapshotInfo mockedDestinationSnapshot = new DifferSnapshotInfo(
- "/path/to/dbcp2", UUID.randomUUID(), 0L, columnFamilyPrefixInfo, null);
-
- Set<String> actualSameSstFiles = new HashSet<>();
- Set<String> actualDiffSstFiles = new HashSet<>();
-
+ (version) -> path2, UUID.randomUUID(), 0L, destSnapshotSstFileInfoMap);
+
+ Map<String, SstFileInfo> actualSameSstFiles = new HashMap<>();
+ Map<String, SstFileInfo> actualDiffSstFiles = new HashMap<>();
+ DifferSnapshotVersion srcSnapshotVersion = new
DifferSnapshotVersion(mockedSourceSnapshot, 0,
+ Collections.singleton("cf1"));
+ DifferSnapshotVersion destSnapshotVersion = new
DifferSnapshotVersion(mockedDestinationSnapshot, 0,
+ Collections.singleton("cf1"));
rocksDBCheckpointDiffer.internalGetSSTDiffList(
- mockedSourceSnapshot,
- mockedDestinationSnapshot,
- srcSnapshotSstFiles,
- destSnapshotSstFiles,
+ srcSnapshotVersion,
+ destSnapshotVersion,
actualSameSstFiles,
actualDiffSstFiles);
// Check same and different SST files result
- assertEquals(expectedSameSstFiles, actualSameSstFiles);
- assertEquals(expectedDiffSstFiles, actualDiffSstFiles);
+ assertEquals(expectedSameSstFiles, actualSameSstFiles.keySet());
+ assertEquals(expectedDiffSstFiles, actualDiffSstFiles.keySet());
}
private static Stream<Arguments> shouldSkipNodeCases() {
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
index a44baf1905f..08ff90ab6cc 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDiffUtils.java
@@ -24,27 +24,21 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
import org.assertj.core.util.Sets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
-import org.mockito.Mockito;
-import org.rocksdb.LiveFileMetaData;
-import org.rocksdb.RocksDB;
/**
* Class to test RocksDiffUtils.
@@ -80,9 +74,6 @@ public void testFilterFunction() {
public static Stream<Arguments> values() {
return Stream.of(
- arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "b",
"f"),
- arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "e",
"f"),
- arguments("validColumnFamily", "invalidColumnFamily", "a", "d", "a",
"f"),
arguments("validColumnFamily", "validColumnFamily", "a", "d", "e",
"g"),
arguments("validColumnFamily", "validColumnFamily", "e", "g", "a",
"d"),
arguments("validColumnFamily", "validColumnFamily", "b", "b", "e",
"g"),
@@ -92,95 +83,40 @@ public static Stream<Arguments> values() {
@ParameterizedTest
@MethodSource("values")
- public void testFilterRelevantSstFilesWithPreExistingCompactionInfo(String
validSSTColumnFamilyName,
- String
invalidColumnFamilyName,
- String
validSSTFileStartRange,
- String
validSSTFileEndRange,
- String
invalidSSTFileStartRange,
- String
invalidSSTFileEndRange) {
+ public void testFilterRelevantSstFilesMap(String validSSTColumnFamilyName,
String invalidColumnFamilyName,
+ String validSSTFileStartRange, String validSSTFileEndRange, String
invalidSSTFileStartRange,
+ String invalidSSTFileEndRange) {
String validSstFile = "filePath/validSSTFile.sst";
String invalidSstFile = "filePath/invalidSSTFile.sst";
String untrackedSstFile = "filePath/untrackedSSTFile.sst";
String expectedPrefix =
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
validSSTFileStartRange.charAt(0)) / 2));
- Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile,
untrackedSstFile);
- Set<String> inputSstFiles = new HashSet<>();
+ Map<String, SstFileInfo> sstFile = ImmutableMap.of(
+ validSstFile, new SstFileInfo(validSstFile, validSSTFileStartRange,
validSSTFileEndRange,
+ validSSTColumnFamilyName), invalidSstFile, new
SstFileInfo(invalidSstFile, invalidSSTFileStartRange,
+ invalidSSTFileEndRange, invalidColumnFamilyName), untrackedSstFile,
+ new SstFileInfo(untrackedSstFile, null, null, null));
+ Map<String, SstFileInfo> inputSstFiles = new HashMap<>();
List<Set<String>> tablesToLookupSet =
Arrays.asList(ImmutableSet.of(validSSTColumnFamilyName),
ImmutableSet.of(invalidColumnFamilyName),
ImmutableSet.of(validSSTColumnFamilyName, invalidColumnFamilyName),
Collections.emptySet());
for (Set<String> tablesToLookup : tablesToLookupSet) {
inputSstFiles.clear();
- inputSstFiles.addAll(sstFile);
+ inputSstFiles.putAll(sstFile);
RocksDiffUtils.filterRelevantSstFiles(inputSstFiles,
+ tablesToLookup,
new TablePrefixInfo(
new HashMap<String, String>() {{
put(invalidColumnFamilyName,
getLexicographicallyHigherString(invalidSSTFileEndRange));
put(validSSTColumnFamilyName, expectedPrefix);
- }}), ImmutableMap.of("validSSTFile", new
CompactionNode(validSstFile, 0, validSSTFileStartRange,
- validSSTFileEndRange, validSSTColumnFamilyName),
"invalidSSTFile",
- new CompactionNode(invalidSstFile, 0, invalidSSTFileStartRange,
- invalidSSTFileEndRange, invalidColumnFamilyName)),
tablesToLookup);
+ }}));
if (tablesToLookup.contains(validSSTColumnFamilyName)) {
- Assertions.assertEquals(Sets.newTreeSet(validSstFile,
untrackedSstFile), inputSstFiles,
+ Assertions.assertEquals(Sets.newTreeSet(validSstFile,
untrackedSstFile), inputSstFiles.keySet(),
"Failed for " + tablesToLookup);
} else {
- Assertions.assertEquals(Sets.newTreeSet(untrackedSstFile),
inputSstFiles, "Failed for " + tablesToLookup);
- }
- }
- }
-
- private LiveFileMetaData getMockedLiveFileMetadata(String columnFamilyName,
String startRange,
- String endRange,
- String name) {
- LiveFileMetaData liveFileMetaData = Mockito.mock(LiveFileMetaData.class);
-
Mockito.when(liveFileMetaData.largestKey()).thenReturn(endRange.getBytes(StandardCharsets.UTF_8));
-
Mockito.when(liveFileMetaData.columnFamilyName()).thenReturn(columnFamilyName.getBytes(StandardCharsets.UTF_8));
-
Mockito.when(liveFileMetaData.smallestKey()).thenReturn(startRange.getBytes(StandardCharsets.UTF_8));
- Mockito.when(liveFileMetaData.fileName()).thenReturn("basePath/" + name +
".sst");
- return liveFileMetaData;
- }
-
- @ParameterizedTest
- @MethodSource("values")
- public void testFilterRelevantSstFilesFromDB(String validSSTColumnFamilyName,
- String invalidColumnFamilyName,
- String validSSTFileStartRange,
- String validSSTFileEndRange,
- String invalidSSTFileStartRange,
- String invalidSSTFileEndRange) {
- for (int numberOfDBs = 1; numberOfDBs < 10; numberOfDBs++) {
- String validSstFile = "filePath/validSSTFile.sst";
- String invalidSstFile = "filePath/invalidSSTFile.sst";
- String untrackedSstFile = "filePath/untrackedSSTFile.sst";
- int expectedDBKeyIndex = numberOfDBs / 2;
- ManagedRocksDB[] rocksDBs =
- IntStream.range(0, numberOfDBs).mapToObj(i ->
Mockito.mock(ManagedRocksDB.class))
- .collect(Collectors.toList()).toArray(new
ManagedRocksDB[numberOfDBs]);
- for (int i = 0; i < numberOfDBs; i++) {
- ManagedRocksDB managedRocksDB = rocksDBs[i];
- RocksDB mockedRocksDB = Mockito.mock(RocksDB.class);
- Mockito.when(managedRocksDB.get()).thenReturn(mockedRocksDB);
- if (i == expectedDBKeyIndex) {
- LiveFileMetaData validLiveFileMetaData =
getMockedLiveFileMetadata(validSSTColumnFamilyName,
- validSSTFileStartRange, validSSTFileEndRange, "validSSTFile");
- LiveFileMetaData invalidLiveFileMetaData =
getMockedLiveFileMetadata(invalidColumnFamilyName,
- invalidSSTFileStartRange, invalidSSTFileEndRange,
"invalidSSTFile");
- List<LiveFileMetaData> liveFileMetaDatas =
Arrays.asList(validLiveFileMetaData, invalidLiveFileMetaData);
-
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(liveFileMetaDatas);
- } else {
-
Mockito.when(mockedRocksDB.getLiveFilesMetaData()).thenReturn(Collections.emptyList());
- }
- Mockito.when(managedRocksDB.getLiveMetadataForSSTFiles())
- .thenAnswer(invocation ->
ManagedRocksDB.getLiveMetadataForSSTFiles(mockedRocksDB));
+ Assertions.assertEquals(Sets.newTreeSet(untrackedSstFile),
inputSstFiles.keySet(),
+ "Failed for " + tablesToLookup);
}
-
- String expectedPrefix =
String.valueOf((char)(((int)validSSTFileEndRange.charAt(0) +
- validSSTFileStartRange.charAt(0)) / 2));
- Set<String> sstFile = Sets.newTreeSet(validSstFile, invalidSstFile,
untrackedSstFile);
- RocksDiffUtils.filterRelevantSstFiles(sstFile, new
TablePrefixInfo(ImmutableMap.of(validSSTColumnFamilyName,
- expectedPrefix)), Collections.emptyMap(),
- ImmutableSet.of(validSSTColumnFamilyName), rocksDBs);
- Assertions.assertEquals(Sets.newTreeSet(validSstFile, untrackedSstFile),
sstFile);
}
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
index 069ecccdd60..3901eeeb0e4 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOMSnapshotDAG.java
@@ -17,11 +17,13 @@
package org.apache.hadoop.ozone.freon;
+import static java.util.stream.Collectors.toMap;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_S3_VOLUME_NAME_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_LOG_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.DB_COMPACTION_SST_BACKUP_DIR;
import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIFF_DIR;
+import static
org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL;
import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COLUMN_FAMILIES_TO_TRACK_IN_DAG;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -34,14 +36,16 @@
import java.time.Duration;
import java.util.Collections;
import java.util.List;
-import java.util.concurrent.TimeoutException;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.hdds.utils.db.RDBStore;
-import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.TablePrefixInfo;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -56,8 +60,11 @@
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
+import
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.DifferSnapshotVersion;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -98,6 +105,7 @@ public static void init() throws Exception {
conf.setFromObject(raftClientConfig);
// Enable filesystem snapshot feature for the test regardless of the
default
conf.setBoolean(OMConfigKeys.OZONE_FILESYSTEM_SNAPSHOT_ENABLED_KEY, true);
+ conf.setInt(OZONE_SNAPSHOT_DEFRAG_SERVICE_INTERVAL, -1);
// Set DB CF write buffer to a much lower value so that flush and
compaction
// happens much more frequently without having to create a lot of keys.
@@ -133,9 +141,9 @@ private static String getSnapshotDBKey(String volumeName,
String bucketName,
return dbKeyPrefix + OM_KEY_PREFIX + snapshotName;
}
- private DifferSnapshotInfo getDifferSnapshotInfo(
- OMMetadataManager omMetadataManager, String volumeName, String
bucketName,
- String snapshotName, ManagedRocksDB snapshotDB) throws IOException {
+ private DifferSnapshotVersion getDifferSnapshotInfo(
+ OMMetadataManager omMetadataManager, OmSnapshotLocalDataManager
localDataManager,
+ String volumeName, String bucketName, String snapshotName) throws
IOException {
final String dbKey = getSnapshotDBKey(volumeName, bucketName,
snapshotName);
final SnapshotInfo snapshotInfo =
@@ -144,16 +152,22 @@ private DifferSnapshotInfo getDifferSnapshotInfo(
// Use RocksDB transaction sequence number in SnapshotInfo, which is
// persisted at the time of snapshot creation, as the snapshot generation
- return new DifferSnapshotInfo(checkpointPath, snapshotInfo.getSnapshotId(),
- snapshotInfo.getDbTxSequenceNumber(),
- omMetadataManager.getTableBucketPrefix(volumeName, bucketName),
- snapshotDB);
+ try (OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider
snapshotLocalData =
+ localDataManager.getOmSnapshotLocalData(snapshotInfo)) {
+ NavigableMap<Integer, List<SstFileInfo>> versionSstFiles =
snapshotLocalData.getSnapshotLocalData()
+ .getVersionSstFileInfos().entrySet().stream()
+ .collect(toMap(Map.Entry::getKey, entry ->
entry.getValue().getSstFiles(),
+ (u, v) -> {
+ throw new IllegalStateException(String.format("Duplicate key
%s", u));
+ }, TreeMap::new));
+ DifferSnapshotInfo dsi = new DifferSnapshotInfo((version) ->
Paths.get(checkpointPath),
+ snapshotInfo.getSnapshotId(), snapshotInfo.getDbTxSequenceNumber(),
versionSstFiles);
+ return new DifferSnapshotVersion(dsi, 0,
COLUMN_FAMILIES_TO_TRACK_IN_DAG);
+ }
}
@Test
- public void testDAGReconstruction()
- throws IOException, InterruptedException, TimeoutException {
-
+ public void testDAGReconstruction() throws IOException {
// Generate keys
RandomKeyGenerator randomKeyGenerator =
new RandomKeyGenerator(cluster.getConf());
@@ -200,28 +214,22 @@ public void testDAGReconstruction()
// Get snapshot SST diff list
OzoneManager ozoneManager = cluster.getOzoneManager();
OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+ TablePrefixInfo bucketPrefix =
omMetadataManager.getTableBucketPrefix(volumeName, bucketName);
+ OmSnapshotLocalDataManager localDataManager =
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager();
RDBStore rdbStore = (RDBStore) omMetadataManager.getStore();
RocksDBCheckpointDiffer differ = rdbStore.getRocksDBCheckpointDiffer();
UncheckedAutoCloseableSupplier<OmSnapshot> snapDB1 =
ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, "snap1");
UncheckedAutoCloseableSupplier<OmSnapshot> snapDB2 =
ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, "snap2");
- DifferSnapshotInfo snap1 = getDifferSnapshotInfo(omMetadataManager,
- volumeName, bucketName, "snap1",
- ((RDBStore) snapDB1.get()
- .getMetadataManager().getStore()).getDb().getManagedRocksDb());
- DifferSnapshotInfo snap2 = getDifferSnapshotInfo(omMetadataManager,
- volumeName, bucketName, "snap2", ((RDBStore) snapDB2.get()
- .getMetadataManager().getStore()).getDb().getManagedRocksDb());
+ DifferSnapshotVersion snap1 = getDifferSnapshotInfo(omMetadataManager,
localDataManager,
+ volumeName, bucketName, "snap1");
+ DifferSnapshotVersion snap2 = getDifferSnapshotInfo(omMetadataManager,
localDataManager,
+ volumeName, bucketName, "snap2");
// RocksDB does checkpointing in a separate thread, wait for it
- final File checkpointSnap1 = new File(snap1.getDbPath());
- GenericTestUtils.waitFor(checkpointSnap1::exists, 2000, 20000);
- final File checkpointSnap2 = new File(snap2.getDbPath());
- GenericTestUtils.waitFor(checkpointSnap2::exists, 2000, 20000);
-
- List<String> sstDiffList21 = differ.getSSTDiffList(snap2, snap1,
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
- .orElse(Collections.emptyList());
+ List<SstFileInfo> sstDiffList21 = differ.getSSTDiffList(snap2, snap1,
bucketPrefix,
+ COLUMN_FAMILIES_TO_TRACK_IN_DAG,
true).orElse(Collections.emptyList());
LOG.debug("Got diff list: {}", sstDiffList21);
// Delete 1000 keys, take a 3rd snapshot, and do another diff
@@ -233,23 +241,19 @@ public void testDAGReconstruction()
LOG.debug("Snapshot created: {}", resp);
UncheckedAutoCloseableSupplier<OmSnapshot> snapDB3 =
ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, "snap3");
- DifferSnapshotInfo snap3 = getDifferSnapshotInfo(omMetadataManager,
- volumeName, bucketName, "snap3",
- ((RDBStore) snapDB3.get()
- .getMetadataManager().getStore()).getDb().getManagedRocksDb());
- final File checkpointSnap3 = new File(snap3.getDbPath());
- GenericTestUtils.waitFor(checkpointSnap3::exists, 2000, 20000);
+ DifferSnapshotVersion snap3 = getDifferSnapshotInfo(omMetadataManager,
localDataManager, volumeName, bucketName,
+ "snap3");
- List<String> sstDiffList32 = differ.getSSTDiffList(snap3, snap2,
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
- .orElse(Collections.emptyList());
+ List<SstFileInfo> sstDiffList32 = differ.getSSTDiffList(snap3, snap2,
bucketPrefix,
+ COLUMN_FAMILIES_TO_TRACK_IN_DAG,
true).orElse(Collections.emptyList());
// snap3-snap1 diff result is a combination of snap3-snap2 and snap2-snap1
- List<String> sstDiffList31 = differ.getSSTDiffList(snap3, snap1,
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
- .orElse(Collections.emptyList());
+ List<SstFileInfo> sstDiffList31 = differ.getSSTDiffList(snap3, snap1,
bucketPrefix,
+ COLUMN_FAMILIES_TO_TRACK_IN_DAG,
true).orElse(Collections.emptyList());
// Same snapshot. Result should be empty list
- List<String> sstDiffList22 = differ.getSSTDiffList(snap2, snap2,
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
- .orElse(Collections.emptyList());
+ List<SstFileInfo> sstDiffList22 = differ.getSSTDiffList(snap2, snap2,
bucketPrefix,
+ COLUMN_FAMILIES_TO_TRACK_IN_DAG,
true).orElse(Collections.emptyList());
assertThat(sstDiffList22).isEmpty();
snapDB1.close();
snapDB2.close();
@@ -258,33 +262,29 @@ public void testDAGReconstruction()
cluster.restartOzoneManager();
ozoneManager = cluster.getOzoneManager();
omMetadataManager = ozoneManager.getMetadataManager();
+ localDataManager =
ozoneManager.getOmSnapshotManager().getSnapshotLocalDataManager();
snapDB1 = ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, "snap1");
snapDB2 = ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, "snap2");
- snap1 = getDifferSnapshotInfo(omMetadataManager,
- volumeName, bucketName, "snap1",
- ((RDBStore) snapDB1.get()
- .getMetadataManager().getStore()).getDb().getManagedRocksDb());
- snap2 = getDifferSnapshotInfo(omMetadataManager,
- volumeName, bucketName, "snap2", ((RDBStore) snapDB2.get()
- .getMetadataManager().getStore()).getDb().getManagedRocksDb());
+ snap1 = getDifferSnapshotInfo(omMetadataManager, localDataManager,
+ volumeName, bucketName, "snap1");
+ snap2 = getDifferSnapshotInfo(omMetadataManager, localDataManager,
+ volumeName, bucketName, "snap2");
snapDB3 = ozoneManager.getOmSnapshotManager()
.getActiveSnapshot(volumeName, bucketName, "snap3");
- snap3 = getDifferSnapshotInfo(omMetadataManager,
- volumeName, bucketName, "snap3",
- ((RDBStore) snapDB3.get()
- .getMetadataManager().getStore()).getDb().getManagedRocksDb());
- List<String> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1,
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
- .orElse(Collections.emptyList());
+ snap3 = getDifferSnapshotInfo(omMetadataManager, localDataManager,
+ volumeName, bucketName, "snap3");
+ List<SstFileInfo> sstDiffList21Run2 = differ.getSSTDiffList(snap2, snap1,
bucketPrefix,
+ COLUMN_FAMILIES_TO_TRACK_IN_DAG,
true).orElse(Collections.emptyList());
assertEquals(sstDiffList21, sstDiffList21Run2);
- List<String> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2,
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
- .orElse(Collections.emptyList());
+ List<SstFileInfo> sstDiffList32Run2 = differ.getSSTDiffList(snap3, snap2,
bucketPrefix,
+ COLUMN_FAMILIES_TO_TRACK_IN_DAG,
true).orElse(Collections.emptyList());
assertEquals(sstDiffList32, sstDiffList32Run2);
- List<String> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1,
COLUMN_FAMILIES_TO_TRACK_IN_DAG)
- .orElse(Collections.emptyList());
+ List<SstFileInfo> sstDiffList31Run2 = differ.getSSTDiffList(snap3, snap1,
bucketPrefix,
+ COLUMN_FAMILIES_TO_TRACK_IN_DAG,
true).orElse(Collections.emptyList());
assertEquals(sstDiffList31, sstDiffList31Run2);
snapDB1.close();
snapDB2.close();
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index db5ac4d6340..3e6ccf771dc 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -313,7 +313,7 @@ public OmSnapshotManager(OzoneManager ozoneManager) throws
IOException {
cacheCleanupServiceInterval, compactNonSnapshotDiffTables,
ozoneManager.getMetadataManager().getLock());
this.snapshotDiffManager = new SnapshotDiffManager(snapshotDiffDb, differ,
- ozoneManager, snapDiffJobCf, snapDiffReportCf,
+ ozoneManager, snapshotLocalDataManager, snapDiffJobCf,
snapDiffReportCf,
columnFamilyOptions, codecRegistry);
diffCleanupServiceInterval = ozoneManager.getConfiguration()
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
index e5bc8dcfa91..219fc01f0a5 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/snapshot/SnapshotDiffManager.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.om.snapshot;
+import static java.util.stream.Collectors.toMap;
import static org.apache.commons.lang3.StringUtils.leftPad;
import static
org.apache.hadoop.hdds.StringUtils.getLexicographicallyHigherString;
import static
org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType.CREATE;
@@ -60,6 +61,7 @@
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.OBJECT_ID_MAP_GEN_OBS;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_DAG_WALK;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.SubStatus.SST_FILE_DELTA_FULL_DIFF;
+import static org.apache.ozone.rocksdiff.RocksDiffUtils.filterRelevantSstFiles;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
@@ -78,9 +80,11 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.NavigableMap;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
@@ -90,7 +94,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.file.PathUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -109,6 +112,8 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmDirectoryInfo;
@@ -117,6 +122,7 @@
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.helpers.WithObjectID;
import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
+import
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider;
import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse;
import org.apache.hadoop.ozone.snapshot.ListSnapshotDiffJobResponse;
import org.apache.hadoop.ozone.snapshot.SnapshotDiffReportOzone;
@@ -125,10 +131,10 @@
import org.apache.hadoop.ozone.util.ClosableIterator;
import org.apache.logging.log4j.util.Strings;
import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
import org.apache.ozone.rocksdb.util.SstFileSetReader;
import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
-import org.apache.ozone.rocksdiff.RocksDiffUtils;
import org.apache.ratis.util.function.UncheckedAutoCloseableSupplier;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
@@ -153,6 +159,7 @@ public class SnapshotDiffManager implements AutoCloseable {
private final ManagedRocksDB db;
private final RocksDBCheckpointDiffer differ;
private final OzoneManager ozoneManager;
+ private final OMMetadataManager activeOmMetadataManager;
private final CodecRegistry codecRegistry;
private final ManagedColumnFamilyOptions familyOptions;
// TODO: [SNAPSHOT] Use different wait time based of job status.
@@ -194,11 +201,13 @@ public class SnapshotDiffManager implements AutoCloseable
{
(SnapshotInfo fromSnapshotInfo, SnapshotInfo toSnapshotInfo) ->
fromSnapshotInfo.getSnapshotId() + DELIMITER +
toSnapshotInfo.getSnapshotId();
+ private final OmSnapshotLocalDataManager snapshotLocalDataManager;
@SuppressWarnings("parameternumber")
public SnapshotDiffManager(ManagedRocksDB db,
RocksDBCheckpointDiffer differ,
OzoneManager ozoneManager,
+ OmSnapshotLocalDataManager
snapshotLocalDataManager,
ColumnFamilyHandle snapDiffJobCfh,
ColumnFamilyHandle snapDiffReportCfh,
ManagedColumnFamilyOptions familyOptions,
@@ -206,6 +215,8 @@ public SnapshotDiffManager(ManagedRocksDB db,
this.db = db;
this.differ = differ;
this.ozoneManager = ozoneManager;
+ this.activeOmMetadataManager = ozoneManager.getMetadataManager();
+ this.snapshotLocalDataManager = snapshotLocalDataManager;
this.familyOptions = familyOptions;
this.codecRegistry = codecRegistry;
this.defaultWaitTime = ozoneManager.getConfiguration().getTimeDuration(
@@ -350,37 +361,34 @@ private void deleteDir(Path path) {
/**
* Convert from SnapshotInfo to DifferSnapshotInfo.
*/
- private DifferSnapshotInfo getDSIFromSI(SnapshotInfo snapshotInfo,
- OmSnapshot omSnapshot, final String volumeName, final String bucketName)
- throws IOException {
-
- final OMMetadataManager snapshotOMMM = omSnapshot.getMetadataManager();
- final String checkpointPath =
- snapshotOMMM.getStore().getDbLocation().getPath();
+ private static DifferSnapshotInfo getDSIFromSI(OMMetadataManager
activeOmMetadataManager,
+ SnapshotInfo snapshotInfo, OmSnapshotLocalData snapshotLocalData) throws
IOException {
final UUID snapshotId = snapshotInfo.getSnapshotId();
final long dbTxSequenceNumber = snapshotInfo.getDbTxSequenceNumber();
+ NavigableMap<Integer, List<SstFileInfo>> versionSstFiles =
snapshotLocalData.getVersionSstFileInfos()
+ .entrySet().stream().collect(toMap(Map.Entry::getKey, entry ->
entry.getValue().getSstFiles(),
+ (u, v) -> {
+ throw new IllegalStateException(String.format("Duplicate key %s",
u));
+ }, TreeMap::new));
+ if (versionSstFiles.isEmpty()) {
+ throw new IOException(String.format("No versions found corresponding to
%s", snapshotId));
+ }
return new DifferSnapshotInfo(
- checkpointPath,
- snapshotId,
- dbTxSequenceNumber,
- snapshotOMMM.getTableBucketPrefix(volumeName, bucketName),
- ((RDBStore)snapshotOMMM.getStore()).getDb().getManagedRocksDb());
+ version -> OmSnapshotManager.getSnapshotPath(activeOmMetadataManager,
snapshotId, version),
+ snapshotId, dbTxSequenceNumber, versionSstFiles);
}
@VisibleForTesting
- protected Set<String> getSSTFileListForSnapshot(OmSnapshot snapshot,
- Set<String> tablesToLookUp) {
- return RdbUtil.getSSTFilesForComparison(((RDBStore)snapshot
- .getMetadataManager().getStore()).getDb().getManagedRocksDb(),
- tablesToLookUp);
+ protected Set<SstFileInfo> getSSTFileSetForSnapshot(OmSnapshot snapshot,
Set<String> tablesToLookUp) {
+ return RdbUtil.getSSTFilesForComparison(
+
((RDBStore)snapshot.getMetadataManager().getStore()).getDb().getManagedRocksDb(),
tablesToLookUp);
}
@VisibleForTesting
- protected Map<Object, String> getSSTFileMapForSnapshot(OmSnapshot snapshot,
+ protected Map<Object, SstFileInfo> getSSTFileMapForSnapshot(OmSnapshot
snapshot,
Set<String> tablesToLookUp) throws IOException {
return RdbUtil.getSSTFilesWithInodesForComparison(((RDBStore)snapshot
- .getMetadataManager().getStore()).getDb().getManagedRocksDb(),
- tablesToLookUp);
+ .getMetadataManager().getStore()).getDb().getManagedRocksDb(),
tablesToLookUp);
}
/**
@@ -1061,10 +1069,12 @@ private void getDeltaFilesAndDiffKeysToObjectIdToKeyMap(
// tombstone is not loaded.
// TODO: [SNAPSHOT] Update Rocksdb SSTFileIterator to read tombstone
if (skipNativeDiff || !isNativeLibsLoaded) {
- Set<String> inputFiles = getSSTFileListForSnapshot(fromSnapshot,
tablesToLookUp);
- ManagedRocksDB fromDB =
((RDBStore)fromSnapshot.getMetadataManager().getStore()).getDb().getManagedRocksDb();
- RocksDiffUtils.filterRelevantSstFiles(inputFiles, tablePrefixes,
tablesToLookUp, fromDB);
- deltaFiles.addAll(inputFiles);
+ Set<SstFileInfo> inputFiles =
filterRelevantSstFiles(getSSTFileSetForSnapshot(fromSnapshot, tablesToLookUp),
+ tablesToLookUp, tablePrefixes);
+ Path fromSnapshotPath =
fromSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath();
+ for (SstFileInfo sstFileInfo : inputFiles) {
+
deltaFiles.add(sstFileInfo.getFilePath(fromSnapshotPath).toAbsolutePath().toString());
+ }
}
if (LOG.isDebugEnabled()) {
LOG.debug("Computed Delta SST File Set, Total count = {} ",
deltaFiles.size());
@@ -1171,21 +1181,22 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
throws IOException {
// TODO: [SNAPSHOT] Refactor the parameter list
Optional<Set<String>> deltaFiles = Optional.empty();
-
// Check if compaction DAG is available, use that if so
if (differ != null && fsInfo != null && tsInfo != null && !useFullDiff) {
- String volume = fsInfo.getVolumeName();
- String bucket = fsInfo.getBucketName();
- // Construct DifferSnapshotInfo
- final DifferSnapshotInfo fromDSI =
- getDSIFromSI(fsInfo, fromSnapshot, volume, bucket);
- final DifferSnapshotInfo toDSI =
- getDSIFromSI(tsInfo, toSnapshot, volume, bucket);
-
- recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK);
- LOG.debug("Calling RocksDBCheckpointDiffer");
- try {
- deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI,
tablesToLookUp, diffDir).map(HashSet::new);
+ try (ReadableOmSnapshotLocalDataProvider snapLocalDataProvider =
snapshotLocalDataManager.getOmSnapshotLocalData(
+ toSnapshot.getSnapshotID(), fromSnapshot.getSnapshotID())) {
+ OmSnapshotLocalData toSnapshotLocalData =
snapLocalDataProvider.getSnapshotLocalData();
+ OmSnapshotLocalData fromSnapshotLocalData =
snapLocalDataProvider.getPreviousSnapshotLocalData();
+ // Construct DifferSnapshotInfo
+ final DifferSnapshotInfo fromDSI =
getDSIFromSI(activeOmMetadataManager, fsInfo, fromSnapshotLocalData);
+ final DifferSnapshotInfo toDSI = getDSIFromSI(activeOmMetadataManager,
tsInfo, toSnapshotLocalData);
+
+ recordActivity(jobKey, SST_FILE_DELTA_DAG_WALK);
+ LOG.debug("Calling RocksDBCheckpointDiffer");
+ final Map<Integer, Integer> versionMap =
toSnapshotLocalData.getVersionSstFileInfos().entrySet()
+ .stream().collect(toMap(Map.Entry::getKey, entry ->
entry.getValue().getPreviousSnapshotVersion()));
+ deltaFiles = differ.getSSTDiffListWithFullPath(toDSI, fromDSI,
versionMap, tablePrefixInfo, tablesToLookUp,
+ diffDir).map(HashSet::new);
} catch (Exception exception) {
recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF);
LOG.warn("Failed to get SST diff file using RocksDBCheckpointDiffer. "
+
@@ -1198,15 +1209,10 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
// the slower approach.
if (!useFullDiff) {
LOG.warn("RocksDBCheckpointDiffer is not available, falling back to" +
- " slow path");
+ " slow path");
}
recordActivity(jobKey, SST_FILE_DELTA_FULL_DIFF);
- ManagedRocksDB fromDB =
((RDBStore)fromSnapshot.getMetadataManager().getStore())
- .getDb().getManagedRocksDb();
- ManagedRocksDB toDB =
((RDBStore)toSnapshot.getMetadataManager().getStore())
- .getDb().getManagedRocksDb();
- Set<String> diffFiles = getDiffFiles(fromSnapshot, toSnapshot,
tablesToLookUp);
- RocksDiffUtils.filterRelevantSstFiles(diffFiles, tablePrefixInfo,
tablesToLookUp, fromDB, toDB);
+ Set<String> diffFiles = getDiffFiles(fromSnapshot, toSnapshot,
tablesToLookUp, tablePrefixInfo);
deltaFiles = Optional.of(diffFiles);
}
@@ -1215,25 +1221,42 @@ Set<String> getDeltaFiles(OmSnapshot fromSnapshot,
toSnapshot.getSnapshotTableKey()));
}
- private Set<String> getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot
toSnapshot, Set<String> tablesToLookUp) {
+ private Set<String> getDiffFiles(OmSnapshot fromSnapshot, OmSnapshot
toSnapshot, Set<String> tablesToLookUp,
+ TablePrefixInfo tablePrefixInfo) {
Set<String> diffFiles;
+ Path fromSnapshotPath =
fromSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath();
+ Path toSnapshotPath =
toSnapshot.getMetadataManager().getStore().getDbLocation().getAbsoluteFile().toPath();
try {
- Map<Object, String> fromSnapshotFiles =
getSSTFileMapForSnapshot(fromSnapshot, tablesToLookUp);
- Map<Object, String> toSnapshotFiles =
getSSTFileMapForSnapshot(toSnapshot, tablesToLookUp);
- diffFiles = Stream.concat(
- fromSnapshotFiles.entrySet().stream()
- .filter(e -> !toSnapshotFiles.containsKey(e.getKey())),
- toSnapshotFiles.entrySet().stream()
- .filter(e -> !fromSnapshotFiles.containsKey(e.getKey())))
- .map(Map.Entry::getValue)
- .collect(Collectors.toSet());
+ diffFiles = new HashSet<>();
+ Map<Object, SstFileInfo> fromSnapshotFiles =
filterRelevantSstFiles(getSSTFileMapForSnapshot(fromSnapshot,
+ tablesToLookUp), tablesToLookUp, tablePrefixInfo);
+ Map<Object, SstFileInfo> toSnapshotFiles =
filterRelevantSstFiles(getSSTFileMapForSnapshot(toSnapshot,
+ tablesToLookUp), tablesToLookUp, tablePrefixInfo);
+ for (Map.Entry<Object, SstFileInfo> entry :
fromSnapshotFiles.entrySet()) {
+ if (!toSnapshotFiles.containsKey(entry.getKey())) {
+
diffFiles.add(entry.getValue().getFilePath(fromSnapshotPath).toAbsolutePath().toString());
+ }
+ }
+ for (Map.Entry<Object, SstFileInfo> entry : toSnapshotFiles.entrySet()) {
+ if (!fromSnapshotFiles.containsKey(entry.getKey())) {
+
diffFiles.add(entry.getValue().getFilePath(toSnapshotPath).toAbsolutePath().toString());
+ }
+ }
} catch (IOException e) {
// In case of exception during inode read use all files
LOG.error("Exception occurred while populating delta files for
snapDiff", e);
LOG.warn("Falling back to full file list comparison, inode-based
optimization skipped.");
+ Set<SstFileInfo> fromSnapshotFiles =
filterRelevantSstFiles(getSSTFileSetForSnapshot(fromSnapshot,
+ tablesToLookUp), tablesToLookUp, tablePrefixInfo);
+ Set<SstFileInfo> toSnapshotFiles =
filterRelevantSstFiles(getSSTFileSetForSnapshot(toSnapshot,
+ tablesToLookUp), tablesToLookUp, tablePrefixInfo);
diffFiles = new HashSet<>();
- diffFiles.addAll(getSSTFileListForSnapshot(fromSnapshot,
tablesToLookUp));
- diffFiles.addAll(getSSTFileListForSnapshot(toSnapshot, tablesToLookUp));
+ for (SstFileInfo sstFileInfo : fromSnapshotFiles) {
+
diffFiles.add(sstFileInfo.getFilePath(fromSnapshotPath).toAbsolutePath().toString());
+ }
+ for (SstFileInfo sstFileInfo : toSnapshotFiles) {
+
diffFiles.add(sstFileInfo.getFilePath(toSnapshotPath).toAbsolutePath().toString());
+ }
}
return diffFiles;
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
index 840ef6eaeb8..b484ad628c7 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/snapshot/TestSnapshotDiffManager.java
@@ -49,6 +49,7 @@
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.IN_PROGRESS;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.QUEUED;
import static
org.apache.hadoop.ozone.snapshot.SnapshotDiffResponse.JobStatus.REJECTED;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.SST_FILE_EXTENSION;
import static org.apache.ratis.util.JavaUtils.attempt;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -84,6 +85,7 @@
import jakarta.annotation.Nonnull;
import java.io.File;
import java.io.IOException;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -103,6 +105,7 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
@@ -126,6 +129,7 @@
import org.apache.hadoop.ozone.om.OMMetadataManager;
import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OmSnapshot;
+import org.apache.hadoop.ozone.om.OmSnapshotLocalData;
import org.apache.hadoop.ozone.om.OmSnapshotManager;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
@@ -136,6 +140,7 @@
import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
import org.apache.hadoop.ozone.om.helpers.WithParentObjectId;
import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
+import
org.apache.hadoop.ozone.om.snapshot.OmSnapshotLocalDataManager.ReadableOmSnapshotLocalDataProvider;
import
org.apache.hadoop.ozone.om.snapshot.SnapshotTestUtils.StubbedPersistentMap;
import org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse;
import
org.apache.hadoop.ozone.snapshot.CancelSnapshotDiffResponse.CancelMessage;
@@ -147,6 +152,7 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ExitUtil;
import org.apache.ozone.rocksdb.util.RdbUtil;
+import org.apache.ozone.rocksdb.util.SstFileInfo;
import org.apache.ozone.rocksdb.util.SstFileSetReader;
import org.apache.ozone.rocksdiff.DifferSnapshotInfo;
import org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer;
@@ -228,6 +234,9 @@ public class TestSnapshotDiffManager {
@Mock
private RocksIterator jobTableIterator;
+ @Mock
+ private OmSnapshotLocalDataManager localDataManager;
+
@Mock
private OmSnapshotManager omSnapshotManager;
@@ -378,7 +387,7 @@ public void init() throws RocksDBException, IOException,
ExecutionException {
return snapshotCache.get(snapInfo.getSnapshotId());
});
when(ozoneManager.getOmSnapshotManager()).thenReturn(omSnapshotManager);
- snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager,
+ snapshotDiffManager = new SnapshotDiffManager(db, differ, ozoneManager,
localDataManager,
snapDiffJobTable, snapDiffReportTable, columnFamilyOptions,
codecRegistry);
when(omSnapshotManager.getDiffCleanupServiceInterval()).thenReturn(0L);
}
@@ -434,10 +443,12 @@ public void testGetDeltaFilesWithDag(int numberOfFiles)
throws IOException {
when(differ.getSSTDiffListWithFullPath(
any(DifferSnapshotInfo.class),
any(DifferSnapshotInfo.class),
+ anyMap(),
+ any(TablePrefixInfo.class),
anySet(),
eq(diffDir))
).thenReturn(Optional.of(Lists.newArrayList(randomStrings)));
-
+ mockSnapshotLocalData();
UncheckedAutoCloseableSupplier<OmSnapshot> rcFromSnapshot =
omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME,
snap1.toString());
UncheckedAutoCloseableSupplier<OmSnapshot> rcToSnapshot =
@@ -448,28 +459,33 @@ public void testGetDeltaFilesWithDag(int numberOfFiles)
throws IOException {
SnapshotInfo fromSnapshotInfo = getMockedSnapshotInfo(snap1);
SnapshotInfo toSnapshotInfo = getMockedSnapshotInfo(snap2);
when(jobTableIterator.isValid()).thenReturn(false);
- try (MockedStatic<RdbUtil> mockedRdbUtil =
Mockito.mockStatic(RdbUtil.class, Mockito.CALLS_REAL_METHODS);
- MockedStatic<RocksDiffUtils> mockedRocksDiffUtils =
Mockito.mockStatic(RocksDiffUtils.class,
- Mockito.CALLS_REAL_METHODS)) {
- mockedRdbUtil.when(() -> RdbUtil.getSSTFilesForComparison(any(), any()))
-
.thenReturn(Collections.singleton(RandomStringUtils.secure().nextAlphabetic(10)));
- mockedRocksDiffUtils.when(() ->
RocksDiffUtils.filterRelevantSstFiles(any(), any(), anySet()))
- .thenAnswer(i -> null);
- SnapshotDiffManager spy = spy(snapshotDiffManager);
- doNothing().when(spy).recordActivity(any(), any());
- doNothing().when(spy).updateProgress(anyString(), anyDouble());
- Set<String> deltaFiles = spy.getDeltaFiles(
- fromSnapshot,
- toSnapshot,
- Sets.newHashSet("cf1", "cf2"), fromSnapshotInfo,
- toSnapshotInfo, false,
- new TablePrefixInfo(Collections.emptyMap()), diffDir, diffJobKey);
- assertEquals(randomStrings, deltaFiles);
- }
+
+ SnapshotDiffManager spy = spy(snapshotDiffManager);
+ doNothing().when(spy).recordActivity(any(), any());
+ doNothing().when(spy).updateProgress(anyString(), anyDouble());
+ Set<String> deltaFiles = spy.getDeltaFiles(
+ fromSnapshot,
+ toSnapshot,
+ Sets.newHashSet("cf1", "cf2"), fromSnapshotInfo,
+ toSnapshotInfo, false,
+ new TablePrefixInfo(Collections.emptyMap()), diffDir, diffJobKey);
+ assertEquals(randomStrings, deltaFiles);
+
rcFromSnapshot.close();
rcToSnapshot.close();
}
+ private void mockSnapshotLocalData() throws IOException {
+ OmSnapshotLocalData localData = mock(OmSnapshotLocalData.class);
+ ReadableOmSnapshotLocalDataProvider snapProvider =
mock(ReadableOmSnapshotLocalDataProvider.class);
+ when(snapProvider.getPreviousSnapshotLocalData()).thenReturn(localData);
+ when(snapProvider.getSnapshotLocalData()).thenReturn(localData);
+ OmSnapshotLocalData.VersionMeta versionMeta =
mock(OmSnapshotLocalData.VersionMeta.class);
+ when(versionMeta.getSstFiles()).thenReturn(Collections.emptyList());
+ when(localData.getVersionSstFileInfos()).thenReturn(ImmutableMap.of(0,
versionMeta));
+ when(localDataManager.getOmSnapshotLocalData(any(UUID.class),
any(UUID.class))).thenReturn(snapProvider);
+ }
+
@ParameterizedTest
@CsvSource({"0,true", "1,true", "2,true", "5,true", "10,true", "100,true",
"1000,true", "10000,true", "0,false", "1,false", "2,false", "5,false",
@@ -483,26 +499,27 @@ public void testGetDeltaFilesWithFullDiff(int
numberOfFiles,
Set<String> deltaStrings = new HashSet<>();
mockedRdbUtil.when(
- () -> RdbUtil.getSSTFilesForComparison(any(), anySet()))
- .thenAnswer((Answer<Set<String>>) invocation -> {
- Set<String> retVal = IntStream.range(0, numberOfFiles)
+ () -> RdbUtil.getSSTFilesWithInodesForComparison(any(),
anySet()))
+ .thenAnswer(invocation -> {
+ Map<Object, SstFileInfo> retVal = IntStream.range(0, numberOfFiles)
.mapToObj(i -> RandomStringUtils.secure().nextAlphabetic(10))
- .collect(Collectors.toSet());
- deltaStrings.addAll(retVal);
+ .collect(Collectors.toMap(Function.identity(),
+ i -> new SstFileInfo(i, null, null, null)));
+
deltaStrings.addAll(retVal.keySet().stream().map(Object::toString).collect(Collectors.toSet()));
return retVal;
});
mockedRocksDiffUtils.when(() ->
- RocksDiffUtils.filterRelevantSstFiles(anySet(), any(), anyMap(),
anySet(), any(ManagedRocksDB.class),
- any(ManagedRocksDB.class)))
- .thenAnswer((Answer<Void>) invocationOnMock -> {
- invocationOnMock.getArgument(0, Set.class).stream()
+ RocksDiffUtils.filterRelevantSstFiles(anyMap(), anySet(), any()))
+ .thenAnswer(invocationOnMock -> {
+ invocationOnMock.getArgument(0, Map.class).entrySet().stream()
.findAny().ifPresent(val -> {
- assertTrue(deltaStrings.contains(val));
- invocationOnMock.getArgument(0, Set.class).remove(val);
- deltaStrings.remove(val);
+ Map.Entry entry = (Map.Entry) val;
+ assertTrue(deltaStrings.contains(entry.getKey()));
+ invocationOnMock.getArgument(0,
Map.class).remove(entry.getKey());
+ deltaStrings.remove(entry.getKey());
});
- return null;
+ return invocationOnMock.getArgument(0, Map.class);
});
UUID snap1 = UUID.randomUUID();
UUID snap2 = UUID.randomUUID();
@@ -515,11 +532,13 @@ public void testGetDeltaFilesWithFullDiff(int
numberOfFiles,
when(differ.getSSTDiffListWithFullPath(
any(DifferSnapshotInfo.class),
any(DifferSnapshotInfo.class),
+ anyMap(),
+ any(TablePrefixInfo.class),
anySet(),
anyString()))
- .thenReturn(Optional.ofNullable(Collections.emptyList()));
+ .thenReturn(Optional.empty());
}
-
+ mockSnapshotLocalData();
UncheckedAutoCloseableSupplier<OmSnapshot> rcFromSnapshot =
omSnapshotManager.getActiveSnapshot(VOLUME_NAME, BUCKET_NAME,
snap1.toString());
UncheckedAutoCloseableSupplier<OmSnapshot> rcToSnapshot =
@@ -542,7 +561,12 @@ public void testGetDeltaFilesWithFullDiff(int
numberOfFiles,
false,
new TablePrefixInfo(Collections.emptyMap()),
snapDiffDir.getAbsolutePath(), diffJobKey);
- assertEquals(deltaStrings, deltaFiles);
+ assertEquals(deltaStrings.stream()
+ .map(i -> dbStore.getDbLocation().toPath().resolve(i +
SST_FILE_EXTENSION).toAbsolutePath().toString())
+ .collect(Collectors.toSet()), deltaFiles);
+ if (useFullDiff && numberOfFiles > 1) {
+ assertThat(deltaFiles).isNotEmpty();
+ }
}
}
@@ -566,8 +590,7 @@ public void testGetDeltaFilesWithDifferThrowException(int
numberOfFiles)
});
mockedRocksDiffUtils.when(() ->
- RocksDiffUtils.filterRelevantSstFiles(anySet(), any(), anyMap(),
anySet(), any(ManagedRocksDB.class),
- any(ManagedRocksDB.class)))
+ RocksDiffUtils.filterRelevantSstFiles(anySet(), anySet(), any()))
.thenAnswer((Answer<Void>) invocationOnMock -> {
invocationOnMock.getArgument(0, Set.class).stream()
.findAny().ifPresent(val -> {
@@ -589,6 +612,8 @@ public void testGetDeltaFilesWithDifferThrowException(int
numberOfFiles)
.getSSTDiffListWithFullPath(
any(DifferSnapshotInfo.class),
any(DifferSnapshotInfo.class),
+ anyMap(),
+ any(TablePrefixInfo.class),
anySet(),
anyString());
@@ -606,6 +631,7 @@ public void testGetDeltaFilesWithDifferThrowException(int
numberOfFiles)
SnapshotDiffManager spy = spy(snapshotDiffManager);
doNothing().when(spy).recordActivity(any(), any());
doNothing().when(spy).updateProgress(anyString(), anyDouble());
+ mockSnapshotLocalData();
Set<String> deltaFiles = spy.getDeltaFiles(
fromSnapshot,
toSnapshot,
@@ -1541,33 +1567,36 @@ public void testGetDeltaFilesWithFullDiff() throws
IOException {
SnapshotDiffManager spy = spy(snapshotDiffManager);
UUID snap1 = UUID.randomUUID();
OmSnapshot fromSnapshot = getMockedOmSnapshot(snap1);
+ Path fromSnapshotPath =
fromSnapshot.getMetadataManager().getStore().getDbLocation().toPath();
UUID snap2 = UUID.randomUUID();
OmSnapshot toSnapshot = getMockedOmSnapshot(snap2);
+ Path toSnapshotPath =
toSnapshot.getMetadataManager().getStore().getDbLocation().toPath();
Mockito.doAnswer(invocation -> {
OmSnapshot snapshot = invocation.getArgument(0);
if (snapshot == fromSnapshot) {
- Map<Integer, String> inodeToFileMap = new HashMap<>();
- inodeToFileMap.put(1, "1.sst");
- inodeToFileMap.put(2, "2.sst");
- inodeToFileMap.put(3, "3.sst");
+ Map<Integer, SstFileInfo> inodeToFileMap = new HashMap<>();
+ inodeToFileMap.put(1, new SstFileInfo("1", null, null, null));
+ inodeToFileMap.put(2, new SstFileInfo("2", null, null, null));
+ inodeToFileMap.put(3, new SstFileInfo("3", null, null, null));
return inodeToFileMap;
}
if (snapshot == toSnapshot) {
- Map<Integer, String> inodeToFileMap = new HashMap<>();
- inodeToFileMap.put(1, "10.sst");
- inodeToFileMap.put(2, "20.sst");
- inodeToFileMap.put(4, "4.sst");
+ Map<Integer, SstFileInfo> inodeToFileMap = new HashMap<>();
+ inodeToFileMap.put(1, new SstFileInfo("10", null, null, null));
+ inodeToFileMap.put(2, new SstFileInfo("20", null, null, null));
+ inodeToFileMap.put(4, new SstFileInfo("4", null, null, null));
return inodeToFileMap;
}
return null;
- }).when(spy).getSSTFileMapForSnapshot(Mockito.any(OmSnapshot.class),
- Mockito.anySet());
+ }).when(spy).getSSTFileMapForSnapshot(Mockito.any(OmSnapshot.class),
Mockito.anySet());
doNothing().when(spy).recordActivity(any(), any());
doNothing().when(spy).updateProgress(anyString(), anyDouble());
String diffJobKey = snap1 + DELIMITER + snap2;
+
Set<String> deltaFiles = spy.getDeltaFiles(fromSnapshot, toSnapshot,
Collections.emptySet(), snapshotInfo,
snapshotInfo, true, new TablePrefixInfo(Collections.emptyMap()), null,
diffJobKey);
- Assertions.assertEquals(Sets.newHashSet("3.sst", "4.sst"), deltaFiles);
+
Assertions.assertEquals(Sets.newHashSet(fromSnapshotPath.resolve("3.sst").toAbsolutePath().toString(),
+ toSnapshotPath.resolve("4.sst").toAbsolutePath().toString()),
deltaFiles);
}
@Test
@@ -1585,7 +1614,7 @@ public void testGetSnapshotDiffReportHappyCase() throws
Exception {
anyString());
doReturn(testDeltaFiles).when(spy)
- .getSSTFileListForSnapshot(any(OmSnapshot.class), anySet());
+ .getSSTFileSetForSnapshot(any(OmSnapshot.class), anySet());
doNothing().when(spy).addToObjectIdMap(eq(keyInfoTable), eq(keyInfoTable),
any(), anyBoolean(), any(), any(), any(), any(), any(), any(),
anyString());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]