This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 275b95c1220 HDDS-7566. Refactor TestRocksDBCheckpointDiffer tests.
(#8785)
275b95c1220 is described below
commit 275b95c1220723f464376abe1b81cc989aabd4b9
Author: Wei-Chiu Chuang <[email protected]>
AuthorDate: Mon Sep 22 23:39:49 2025 -0700
HDDS-7566. Refactor TestRocksDBCheckpointDiffer tests. (#8785)
---
.../apache/ozone/rocksdiff/TestCompactionDag.java | 716 +++++++++++++++++++++
.../rocksdiff/TestRocksDBCheckpointDiffer.java | 488 +-------------
2 files changed, 719 insertions(+), 485 deletions(-)
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestCompactionDag.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestCompactionDag.java
new file mode 100644
index 00000000000..025aabd1cb2
--- /dev/null
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestCompactionDag.java
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ozone.rocksdiff;
+
+import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE_DEFAULT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT;
+import static
org.apache.ozone.rocksdiff.RocksDBCheckpointDiffer.COMPACTION_LOG_FILE_NAME_SUFFIX;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.graph.GraphBuilder;
+import com.google.common.graph.MutableGraph;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedColumnFamilyOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedDBOptions;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRawSSTFileReader;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksDB;
+import org.apache.hadoop.hdds.utils.db.managed.ManagedRocksIterator;
+import org.apache.hadoop.ozone.lock.BootstrapStateHandler;
+import org.apache.ozone.compaction.log.CompactionLogEntry;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.RocksDBException;
+import org.slf4j.event.Level;
+
+/**
+ * Test for CompactionDag.
+ */
+public class TestCompactionDag {
+
+ private static final List<List<String>> SST_FILES_BY_LEVEL = Arrays.asList(
+ Arrays.asList("000015", "000013", "000011", "000009"),
+ Arrays.asList("000018", "000016", "000017", "000026", "000024", "000022",
+ "000020"),
+ Arrays.asList("000027", "000030", "000028", "000029", "000031", "000039",
+ "000037", "000035", "000033"),
+ Arrays.asList("000040", "000044", "000042", "000043", "000045", "000041",
+ "000046", "000054", "000052", "000050", "000048"),
+ Arrays.asList("000059", "000055", "000056", "000060", "000057", "000058")
+ );
+
+ private static final List<List<CompactionNode>> COMPACTION_NODES_BY_LEVEL =
+ SST_FILES_BY_LEVEL.stream()
+ .map(sstFiles ->
+ sstFiles.stream()
+ .map(
+ sstFile -> new CompactionNode(sstFile,
+ 1000L,
+ null, null, null
+ ))
+ .collect(Collectors.toList()))
+ .collect(Collectors.toList());
+
+ private static final String ACTIVE_DB_DIR_NAME = "./rocksdb-data";
+ private static final String METADATA_DIR_NAME = "./metadata";
+ private static final String COMPACTION_LOG_DIR_NAME = "compaction-log";
+ private static final String SST_BACK_UP_DIR_NAME = "compaction-sst-backup";
+ private File activeDbDir;
+ private File metadataDirDir;
+ private File compactionLogDir;
+ private File sstBackUpDir;
+
+ private final ExecutorService executorService =
+ Executors.newCachedThreadPool();
+ private RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
+ private ManagedRocksDB activeRocksDB;
+ private ColumnFamilyHandle compactionLogTableCFHandle;
+
+ @BeforeEach
+ public void init() throws RocksDBException {
+ // Checkpoint differ log level. Set to DEBUG for verbose output
+ GenericTestUtils.setLogLevel(RocksDBCheckpointDiffer.class, Level.INFO);
+ // Test class log level. Set to DEBUG for verbose output
+ GenericTestUtils.setLogLevel(TestCompactionDag.class, Level.INFO);
+
+ activeDbDir = new File(ACTIVE_DB_DIR_NAME);
+ createDir(activeDbDir, ACTIVE_DB_DIR_NAME);
+
+ metadataDirDir = new File(METADATA_DIR_NAME);
+ createDir(metadataDirDir, METADATA_DIR_NAME);
+
+ compactionLogDir = new File(METADATA_DIR_NAME, COMPACTION_LOG_DIR_NAME);
+ createDir(compactionLogDir,
+ METADATA_DIR_NAME + "/" + COMPACTION_LOG_DIR_NAME);
+
+ sstBackUpDir = new File(METADATA_DIR_NAME, SST_BACK_UP_DIR_NAME);
+ createDir(sstBackUpDir,
+ METADATA_DIR_NAME + "/" + SST_BACK_UP_DIR_NAME);
+
+ ConfigurationSource config = mock(ConfigurationSource.class);
+
+ when(config.getTimeDuration(
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED,
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_MAX_TIME_ALLOWED_DEFAULT,
+ TimeUnit.MILLISECONDS)).thenReturn(MINUTES.toMillis(10));
+
+ when(config.getTimeDuration(
+ OZONE_OM_SNAPSHOT_COMPACTION_DAG_PRUNE_DAEMON_RUN_INTERVAL,
+ OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_DAG_DAEMON_RUN_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS)).thenReturn(0L);
+
+ when(config.getInt(
+ OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE,
+ OZONE_OM_SNAPSHOT_PRUNE_COMPACTION_BACKUP_BATCH_SIZE_DEFAULT))
+ .thenReturn(2000);
+
+ when(config.getBoolean(
+ OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB,
+ OZONE_OM_SNAPSHOT_LOAD_NATIVE_LIB_DEFAULT)).thenReturn(true);
+
+ try (MockedStatic<ManagedRawSSTFileReader> mockedRawSSTReader =
+ Mockito.mockStatic(ManagedRawSSTFileReader.class)) {
+ mockedRawSSTReader.when(ManagedRawSSTFileReader::loadLibrary)
+ .thenReturn(true);
+ rocksDBCheckpointDiffer = new RocksDBCheckpointDiffer(METADATA_DIR_NAME,
+ SST_BACK_UP_DIR_NAME,
+ COMPACTION_LOG_DIR_NAME,
+ ACTIVE_DB_DIR_NAME,
+ config);
+ }
+
+ ManagedColumnFamilyOptions cfOpts = new ManagedColumnFamilyOptions();
+ cfOpts.optimizeUniversalStyleCompaction();
+ List<ColumnFamilyDescriptor> cfDescriptors =
+ TestRocksDBCheckpointDiffer.getCFDescriptorList(cfOpts);
+ List<ColumnFamilyHandle> cfHandles = new ArrayList<>();
+ ManagedDBOptions dbOptions = new ManagedDBOptions();
+ dbOptions.setCreateIfMissing(true);
+ dbOptions.setCreateMissingColumnFamilies(true);
+
+ rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
+ activeRocksDB = ManagedRocksDB.open(dbOptions, ACTIVE_DB_DIR_NAME,
+ cfDescriptors, cfHandles);
+ compactionLogTableCFHandle = cfHandles.get(4);
+
+ rocksDBCheckpointDiffer.setCompactionLogTableCFHandle(cfHandles.get(4));
+ rocksDBCheckpointDiffer.setActiveRocksDB(activeRocksDB);
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+ }
+
+ private void createDir(File file, String filePath) {
+ // Remove already existed dir.
+ if (file.exists()) {
+ deleteDirectory(file);
+ }
+
+ // Create new Dir.
+ if (!file.mkdirs()) {
+ fail("Error in creating directory: " + filePath);
+ }
+ }
+
+ private boolean deleteDirectory(File directoryToBeDeleted) {
+ File[] allContents = directoryToBeDeleted.listFiles();
+ if (allContents != null) {
+ for (File file : allContents) {
+ if (!deleteDirectory(file)) {
+ return false;
+ }
+ }
+ }
+ return directoryToBeDeleted.delete();
+ }
+
+ @AfterEach
+ public void cleanUp() {
+ IOUtils.closeQuietly(rocksDBCheckpointDiffer);
+ IOUtils.closeQuietly(compactionLogTableCFHandle);
+ IOUtils.closeQuietly(activeRocksDB);
+ deleteDirectory(compactionLogDir);
+ deleteDirectory(sstBackUpDir);
+ deleteDirectory(metadataDirDir);
+ deleteDirectory(activeDbDir);
+ }
+
+ /**
+ * Creates a backward compaction DAG from a list of level nodes.
+ * It assumes that at each level files get compacted to the half of number
+ * of files at the next level.
+ * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
+ * at level-2 are from compaction of level-1 and rests are new.
+ */
+ private static MutableGraph<CompactionNode> createBackwardDagFromLevelNodes(
+ int fromLevel,
+ int toLevel
+ ) {
+ MutableGraph<CompactionNode> dag = GraphBuilder.directed().build();
+
+ if (fromLevel == toLevel) {
+ COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
+ return dag;
+ }
+
+ for (int level = fromLevel; level < toLevel; level++) {
+ List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
+ List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level +
1);
+
+ for (CompactionNode compactionNode : currentLevel) {
+ for (int j = 0; j < nextLevel.size(); j++) {
+ dag.addNode(compactionNode);
+ dag.addNode(nextLevel.get(j));
+
+ int child = nextLevel.size();
+ if (level < COMPACTION_NODES_BY_LEVEL.size() - 2) {
+ child /= 2;
+ }
+
+ if (j < child) {
+ dag.putEdge(compactionNode, nextLevel.get(j));
+ }
+ }
+ }
+ }
+
+ return dag;
+ }
+
+ /**
+ * Creates a forward compaction DAG from a list of level nodes.
+ * It assumes that at each level first half of the files are from the
+ * compaction of the previous level.
+ * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
+ * at level-2 are from compaction of level-1 and rests are new.
+ */
+ private static MutableGraph<CompactionNode> createForwardDagFromLevelNodes(
+ int fromLevel,
+ int toLevel
+ ) {
+ MutableGraph<CompactionNode> dag = GraphBuilder.directed().build();
+
+ if (fromLevel == toLevel) {
+ COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
+ return dag;
+ }
+
+ dag = GraphBuilder.directed().build();
+ for (int level = fromLevel; level > toLevel; level--) {
+ List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
+ List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level -
1);
+
+ for (int i = 0; i < currentLevel.size(); i++) {
+ for (CompactionNode compactionNode : nextLevel) {
+ dag.addNode(currentLevel.get(i));
+ dag.addNode(compactionNode);
+
+ int parent = currentLevel.size();
+ if (level < COMPACTION_NODES_BY_LEVEL.size() - 1) {
+ parent /= 2;
+ }
+
+ if (i < parent) {
+ dag.putEdge(currentLevel.get(i), compactionNode);
+ }
+ }
+ }
+ }
+
+ return dag;
+ }
+
+ /**
+ * Test cases for pruneBackwardDag.
+ */
+ private static Stream<Arguments> pruneBackwardDagScenarios() {
+ Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
+ Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
+ Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
+ Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
+
+ level1Files.addAll(level0Files);
+ level2Files.addAll(level1Files);
+ level3Files.addAll(level2Files);
+
+ return Stream.of(
+ Arguments.of("Remove level 0 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
+ createBackwardDagFromLevelNodes(1, 4),
+ level0Files
+ ),
+ Arguments.of("Remove level 1 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
+ createBackwardDagFromLevelNodes(2, 4),
+ level1Files
+ ),
+ Arguments.of("Remove level 2 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
+ createBackwardDagFromLevelNodes(3, 4),
+ level2Files
+ ),
+ Arguments.of("Remove level 3 from backward DAG",
+ createBackwardDagFromLevelNodes(0, 4),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
+ createBackwardDagFromLevelNodes(4, 4),
+ level3Files
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("pruneBackwardDagScenarios")
+ public void testPruneBackwardDag(String description,
+ MutableGraph<CompactionNode> originalDag,
+ Set<CompactionNode> levelToBeRemoved,
+ MutableGraph<CompactionNode> expectedDag,
+ Set<String> expectedFileNodesRemoved) {
+ CompactionDag compactionDag = new CompactionDag();
+ Set<String> actualFileNodesRemoved =
+ compactionDag.pruneBackwardDag(originalDag, levelToBeRemoved);
+ assertEquals(expectedDag, originalDag);
+ assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
+ }
+
+ /**
+ * Test cases for pruneBackwardDag.
+ */
+ private static Stream<Arguments> pruneForwardDagScenarios() {
+ Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
+ Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
+ Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
+ Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
+
+ level1Files.addAll(level0Files);
+ level2Files.addAll(level1Files);
+ level3Files.addAll(level2Files);
+
+ return Stream.of(
+ Arguments.of("Remove level 0 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
+ createForwardDagFromLevelNodes(4, 1),
+ level0Files
+ ),
+ Arguments.of("Remove level 1 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
+ createForwardDagFromLevelNodes(4, 2),
+ level1Files
+ ),
+ Arguments.of("Remove level 2 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
+ createForwardDagFromLevelNodes(4, 3),
+ level2Files
+ ),
+ Arguments.of("Remove level 3 from forward DAG",
+ createForwardDagFromLevelNodes(4, 0),
+ new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
+ createForwardDagFromLevelNodes(4, 4),
+ level3Files
+ )
+ );
+ }
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("pruneForwardDagScenarios")
+ public void testPruneForwardDag(String description,
+ MutableGraph<CompactionNode> originalDag,
+ Set<CompactionNode> levelToBeRemoved,
+ MutableGraph<CompactionNode> expectedDag,
+ Set<String> expectedFileNodesRemoved) {
+ CompactionDag compactionDag = new CompactionDag();
+ Set<String> actualFileNodesRemoved =
+ compactionDag.pruneForwardDag(originalDag, levelToBeRemoved);
+ assertEquals(expectedDag, originalDag);
+ assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
+ }
+
+ @SuppressWarnings("methodlength")
+ private static Stream<Arguments> compactionDagPruningScenarios() {
+ long currentTimeMillis = System.currentTimeMillis();
+
+ String compactionLogFile0 = "S 1000 snapshotId0 " +
+ (currentTimeMillis - MINUTES.toMillis(30)) + " \n";
+ String compactionLogFile1 = "C 1500 000015,000013,000011,000009:000018," +
+ "000016,000017\n"
+ + "S 2000 snapshotId1 " +
+ (currentTimeMillis - MINUTES.toMillis(24)) + " \n";
+
+ String compactionLogFile2 = "C 2500 000018,000016,000017,000026,000024," +
+ "000022,000020:000027,000030,000028,000029,000031,000029\n"
+ + "S 3000 snapshotId2 " +
+ (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
+
+ String compactionLogFile3 = "C 3500 000027,000030,000028,000031,000029," +
+ "000039,000037,000035,000033:000040,000044,000042,000043,000046," +
+ "000041,000045\n"
+ + "S 4000 snapshotId3 " +
+ (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
+
+ String compactionLogFile4 = "C 4500 000040,000044,000042,000043,000046," +
+ "000041,000045,000054,000052,000050,000048:000059,000055,000056," +
+ "000060,000057,000058\n"
+ + "S 5000 snapshotId4 " +
+ (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
+
+ String compactionLogFileWithoutSnapshot1 = "C 1500 000015,000013,000011," +
+ "000009:000018,000016,000017\n"
+ + "C 2000 000018,000016,000017,000026,000024,000022,000020" +
+ ":000027,000030,000028,000031,000029\n";
+
+ String compactionLogFileWithoutSnapshot2 = "C 4500 000040,000044,000042," +
+ "000043,000046,000041,000045,000054,000052,000050,000048:000059," +
+ "000055,000056,000060,000057,000058\n";
+
+ String compactionLogFileOnlyWithSnapshot1 =
+ "S 3000 snapshotIdWithoutCompaction1 " +
+ (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot2 =
+ "S 3000 snapshotIdWithoutCompaction2 " +
+ (currentTimeMillis - MINUTES.toMillis(15)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot3 =
+ "S 3000 snapshotIdWithoutCompaction3 " +
+ (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot4 =
+ "S 3000 snapshotIdWithoutCompaction4 " +
+ (currentTimeMillis - MINUTES.toMillis(9)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot5 =
+ "S 3000 snapshotIdWithoutCompaction5 " +
+ (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
+
+ String compactionLogFileOnlyWithSnapshot6 =
+ "S 3000 snapshotIdWithoutCompaction6 " +
+ (currentTimeMillis - MINUTES.toMillis(3)) + " \n";
+
+ Set<String> expectedNodes = ImmutableSet.of("000059", "000055", "000056",
+ "000060", "000057", "000058");
+
+ return Stream.of(
+ Arguments.of("Each compaction log file has only one snapshot and one" +
+ " compaction statement except first log file.",
+ Arrays.asList(compactionLogFile0, compactionLogFile1,
+ compactionLogFile2, compactionLogFile3, compactionLogFile4),
+ null,
+ expectedNodes,
+ 4,
+ 0
+ ),
+ Arguments.of("Compaction log doesn't have snapshot because OM" +
+ " restarted. Restart happened before snapshot to be deleted.",
+ Arrays.asList(compactionLogFile0,
+ compactionLogFileWithoutSnapshot1,
+ compactionLogFile3,
+ compactionLogFile4),
+ null,
+ expectedNodes,
+ 4,
+ 0
+ ),
+ Arguments.of("Compaction log doesn't have snapshot because OM" +
+ " restarted. Restart happened after snapshot to be deleted.",
+ Arrays.asList(compactionLogFile0, compactionLogFile1,
+ compactionLogFile2, compactionLogFile3,
+ compactionLogFileWithoutSnapshot2,
+ compactionLogFileOnlyWithSnapshot4),
+ null,
+ expectedNodes,
+ 4,
+ 0
+ ),
+ Arguments.of("No compaction happened in between two snapshots.",
+ Arrays.asList(compactionLogFile0, compactionLogFile1,
+ compactionLogFile2, compactionLogFile3,
+ compactionLogFileOnlyWithSnapshot1,
+ compactionLogFileOnlyWithSnapshot2, compactionLogFile4),
+ null,
+ expectedNodes,
+ 4,
+ 0
+ ),
+ Arguments.of("Only contains snapshots but no compaction.",
+ Arrays.asList(compactionLogFileOnlyWithSnapshot1,
+ compactionLogFileOnlyWithSnapshot2,
+ compactionLogFileOnlyWithSnapshot3,
+ compactionLogFileOnlyWithSnapshot4,
+ compactionLogFileOnlyWithSnapshot5,
+ compactionLogFileOnlyWithSnapshot6),
+ null,
+ Collections.emptySet(),
+ 0,
+ 0
+ ),
+ Arguments.of("No file exists because compaction has not happened" +
+ " and snapshot is not taken.",
+ Collections.emptyList(),
+ null,
+ Collections.emptySet(),
+ 0,
+ 0
+ ),
+ Arguments.of("When compaction table is used case 1.",
+ null,
+ asList(TestRocksDBCheckpointDiffer.createCompactionEntry(1500,
+ (currentTimeMillis - MINUTES.toMillis(24)),
+ asList("000015", "000013", "000011", "000009"),
+ asList("000018", "000016", "000017")),
+ TestRocksDBCheckpointDiffer.createCompactionEntry(2500,
+ (currentTimeMillis - MINUTES.toMillis(20)),
+ asList("000018", "000016", "000017", "000026", "000024",
+ "000022", "000020"),
+ asList("000027", "000030", "000028", "000031", "000029")),
+ TestRocksDBCheckpointDiffer.createCompactionEntry(3500,
+ (currentTimeMillis - MINUTES.toMillis(16)),
+ asList("000027", "000030", "000028", "000031", "000029",
+ "000039", "000037", "000035", "000033"),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045")),
+ TestRocksDBCheckpointDiffer.createCompactionEntry(4500,
+ (currentTimeMillis - MINUTES.toMillis(12)),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045", "000054", "000052", "000050",
+ "000048"),
+ asList("000059", "000055", "000056", "000060", "000057",
+ "000058"))),
+ expectedNodes,
+ 4,
+ 0
+ ),
+ Arguments.of("When compaction table is used case 2.",
+ null,
+ asList(TestRocksDBCheckpointDiffer.createCompactionEntry(1500,
+ (currentTimeMillis - MINUTES.toMillis(24)),
+ asList("000015", "000013", "000011", "000009"),
+ asList("000018", "000016", "000017")),
+ TestRocksDBCheckpointDiffer.createCompactionEntry(2500,
+ (currentTimeMillis - MINUTES.toMillis(18)),
+ asList("000018", "000016", "000017", "000026", "000024",
+ "000022", "000020"),
+ asList("000027", "000030", "000028", "000031", "000029")),
+ TestRocksDBCheckpointDiffer.createCompactionEntry(3500,
+ (currentTimeMillis - MINUTES.toMillis(12)),
+ asList("000027", "000030", "000028", "000031", "000029",
+ "000039", "000037", "000035", "000033"),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045")),
+ TestRocksDBCheckpointDiffer.createCompactionEntry(4500,
+ (currentTimeMillis - MINUTES.toMillis(6)),
+ asList("000040", "000044", "000042", "000043", "000046",
+ "000041", "000045", "000054", "000052", "000050",
+ "000048"),
+ asList("000059", "000055", "000056", "000060", "000057",
+ "000058"))),
+ ImmutableSet.of("000059", "000055", "000056", "000060", "000057",
+ "000058", "000040", "000044", "000042", "000043", "000046",
+ "000041", "000045", "000054", "000052", "000050", "000048"),
+ 4,
+ 1
+ )
+ );
+ }
+
+ /**
+ * End-to-end test for snapshot's compaction history pruning.
+ */
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("compactionDagPruningScenarios")
+ public void testPruneOlderSnapshotsWithCompactionHistory(
+ String description,
+ List<String> compactionLogs,
+ List<CompactionLogEntry> compactionLogEntries,
+ Set<String> expectedNodes,
+ int expectedNumberOfLogEntriesBeforePruning,
+ int expectedNumberOfLogEntriesAfterPruning
+ ) throws IOException, ExecutionException, InterruptedException,
+ TimeoutException {
+ List<File> filesCreated = new ArrayList<>();
+
+ if (compactionLogs != null) {
+ for (int i = 0; i < compactionLogs.size(); i++) {
+ String compactionFileName = METADATA_DIR_NAME + "/" +
+ COMPACTION_LOG_DIR_NAME
+ + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
+ File compactionFile = new File(compactionFileName);
+ Files.write(compactionFile.toPath(),
+ compactionLogs.get(i).getBytes(StandardCharsets.UTF_8));
+ filesCreated.add(compactionFile);
+ }
+ } else if (compactionLogEntries != null) {
+ compactionLogEntries.forEach(entry ->
+ rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
+ } else {
+ throw new IllegalArgumentException("One of compactionLog or" +
+ " compactionLogEntries should be present.");
+ }
+
+ rocksDBCheckpointDiffer.loadAllCompactionLogs();
+ assertEquals(expectedNumberOfLogEntriesBeforePruning,
+ countEntriesInCompactionLogTable());
+ waitForLock(rocksDBCheckpointDiffer,
+ RocksDBCheckpointDiffer::pruneOlderSnapshotsWithCompactionHistory);
+
+ Set<String> actualNodesInForwardDAG = rocksDBCheckpointDiffer
+ .getForwardCompactionDAG()
+ .nodes()
+ .stream()
+ .map(CompactionNode::getFileName)
+ .collect(Collectors.toSet());
+
+ Set<String> actualNodesBackwardDAG = rocksDBCheckpointDiffer
+ .getBackwardCompactionDAG()
+ .nodes()
+ .stream()
+ .map(CompactionNode::getFileName)
+ .collect(Collectors.toSet());
+
+ assertEquals(expectedNodes, actualNodesInForwardDAG);
+ assertEquals(expectedNodes, actualNodesBackwardDAG);
+
+ for (int i = 0; compactionLogs != null && i < compactionLogs.size(); i++) {
+ File compactionFile = filesCreated.get(i);
+ assertFalse(compactionFile.exists());
+ }
+
+ assertEquals(expectedNumberOfLogEntriesAfterPruning,
+ countEntriesInCompactionLogTable());
+ }
+
+ private int countEntriesInCompactionLogTable() {
+ try (ManagedRocksIterator iterator = new ManagedRocksIterator(
+ activeRocksDB.get().newIterator(compactionLogTableCFHandle))) {
+ iterator.get().seekToFirst();
+ int count = 0;
+ while (iterator.get().isValid()) {
+ iterator.get().next();
+ count++;
+ }
+ return count;
+ }
+ }
+
+ // Take the lock, confirm that the consumer doesn't finish
+ // then release the lock and confirm that the consumer does finish.
+ private void waitForLock(RocksDBCheckpointDiffer differ,
+ Consumer<RocksDBCheckpointDiffer> c)
+ throws InterruptedException, ExecutionException, TimeoutException {
+
+ Future<Boolean> future;
+ // Take the lock and start the consumer.
+ try (BootstrapStateHandler.Lock lock =
+ differ.getBootstrapStateLock().lock()) {
+ future = executorService.submit(
+ () -> {
+ c.accept(differ);
+ return true;
+ });
+ // Confirm that the consumer doesn't finish with lock taken.
+ assertThrows(TimeoutException.class,
+ () -> future.get(1000, TimeUnit.MILLISECONDS));
+ }
+ // Confirm consumer finishes when unlocked.
+ assertTrue(future.get(100, TimeUnit.MILLISECONDS));
+ }
+}
diff --git
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
index bc21904f9c0..fbdb9ea2198 100644
---
a/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
+++
b/hadoop-hdds/rocksdb-checkpoint-differ/src/test/java/org/apache/ozone/rocksdiff/TestRocksDBCheckpointDiffer.java
@@ -48,7 +48,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import com.google.common.graph.GraphBuilder;
import com.google.common.graph.MutableGraph;
import java.io.File;
import java.io.FileWriter;
@@ -134,29 +133,6 @@ public class TestRocksDBCheckpointDiffer {
private static final Logger LOG =
LoggerFactory.getLogger(TestRocksDBCheckpointDiffer.class);
- private static final List<List<String>> SST_FILES_BY_LEVEL = Arrays.asList(
- Arrays.asList("000015", "000013", "000011", "000009"),
- Arrays.asList("000018", "000016", "000017", "000026", "000024", "000022",
- "000020"),
- Arrays.asList("000027", "000030", "000028", "000029", "000031", "000039",
- "000037", "000035", "000033"),
- Arrays.asList("000040", "000044", "000042", "000043", "000045", "000041",
- "000046", "000054", "000052", "000050", "000048"),
- Arrays.asList("000059", "000055", "000056", "000060", "000057", "000058")
- );
-
- private static final List<List<CompactionNode>> COMPACTION_NODES_BY_LEVEL =
- SST_FILES_BY_LEVEL.stream()
- .map(sstFiles ->
- sstFiles.stream()
- .map(
- sstFile -> new CompactionNode(sstFile,
- 1000L,
- null, null, null
- ))
- .collect(Collectors.toList()))
- .collect(Collectors.toList());
-
private final List<CompactionLogEntry> compactionLogEntryList =
Arrays.asList(
new CompactionLogEntry(101, System.currentTimeMillis(),
Arrays.asList(
@@ -1200,474 +1176,16 @@ private void printMutableGraphFromAGivenNode(
}
}
- /**
- * Creates a backward compaction DAG from a list of level nodes.
- * It assumes that at each level files get compacted to the half of number
- * of files at the next level.
- * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
- * at level-2 are from compaction of level-1 and rests are new.
- */
- private static MutableGraph<CompactionNode> createBackwardDagFromLevelNodes(
- int fromLevel,
- int toLevel
- ) {
- MutableGraph<CompactionNode> dag = GraphBuilder.directed().build();
-
- if (fromLevel == toLevel) {
- COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
- return dag;
- }
-
- for (int level = fromLevel; level < toLevel; level++) {
- List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
- List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level +
1);
-
- for (CompactionNode compactionNode : currentLevel) {
- for (int j = 0; j < nextLevel.size(); j++) {
- dag.addNode(compactionNode);
- dag.addNode(nextLevel.get(j));
-
- int child = nextLevel.size();
- if (level < COMPACTION_NODES_BY_LEVEL.size() - 2) {
- child /= 2;
- }
-
- if (j < child) {
- dag.putEdge(compactionNode, nextLevel.get(j));
- }
- }
- }
- }
-
- return dag;
- }
-
- /**
- * Creates a forward compaction DAG from a list of level nodes.
- * It assumes that at each level first half of the files are from the
- * compaction of the previous level.
- * e.g. if level-1 has 7 files and level-2 has 9 files, so first 4 files
- * at level-2 are from compaction of level-1 and rests are new.
- */
- private static MutableGraph<CompactionNode> createForwardDagFromLevelNodes(
- int fromLevel,
- int toLevel
- ) {
- MutableGraph<CompactionNode> dag = GraphBuilder.directed().build();
-
- if (fromLevel == toLevel) {
- COMPACTION_NODES_BY_LEVEL.get(fromLevel).forEach(dag::addNode);
- return dag;
- }
-
- dag = GraphBuilder.directed().build();
- for (int level = fromLevel; level > toLevel; level--) {
- List<CompactionNode> currentLevel = COMPACTION_NODES_BY_LEVEL.get(level);
- List<CompactionNode> nextLevel = COMPACTION_NODES_BY_LEVEL.get(level -
1);
-
- for (int i = 0; i < currentLevel.size(); i++) {
- for (CompactionNode compactionNode : nextLevel) {
- dag.addNode(currentLevel.get(i));
- dag.addNode(compactionNode);
-
- int parent = currentLevel.size();
- if (level < COMPACTION_NODES_BY_LEVEL.size() - 1) {
- parent /= 2;
- }
-
- if (i < parent) {
- dag.putEdge(currentLevel.get(i), compactionNode);
- }
- }
- }
- }
-
- return dag;
- }
-
- /**
- * Test cases for pruneBackwardDag.
- */
- private static Stream<Arguments> pruneBackwardDagScenarios() {
- Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
- Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
- Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
- Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
-
- level1Files.addAll(level0Files);
- level2Files.addAll(level1Files);
- level3Files.addAll(level2Files);
-
- return Stream.of(
- Arguments.of("Remove level 0 from backward DAG",
- createBackwardDagFromLevelNodes(0, 4),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
- createBackwardDagFromLevelNodes(1, 4),
- level0Files
- ),
- Arguments.of("Remove level 1 from backward DAG",
- createBackwardDagFromLevelNodes(0, 4),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
- createBackwardDagFromLevelNodes(2, 4),
- level1Files
- ),
- Arguments.of("Remove level 2 from backward DAG",
- createBackwardDagFromLevelNodes(0, 4),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
- createBackwardDagFromLevelNodes(3, 4),
- level2Files
- ),
- Arguments.of("Remove level 3 from backward DAG",
- createBackwardDagFromLevelNodes(0, 4),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
- createBackwardDagFromLevelNodes(4, 4),
- level3Files
- )
- );
- }
-
- @ParameterizedTest(name = "{0}")
- @MethodSource("pruneBackwardDagScenarios")
- public void testPruneBackwardDag(String description,
- MutableGraph<CompactionNode> originalDag,
- Set<CompactionNode> levelToBeRemoved,
- MutableGraph<CompactionNode> expectedDag,
- Set<String> expectedFileNodesRemoved) {
- Set<String> actualFileNodesRemoved =
- rocksDBCheckpointDiffer.pruneBackwardDag(originalDag,
levelToBeRemoved);
- assertEquals(expectedDag, originalDag);
- assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
- }
-
- /**
- * Test cases for pruneBackwardDag.
- */
- private static Stream<Arguments> pruneForwardDagScenarios() {
- Set<String> level0Files = new HashSet<>(SST_FILES_BY_LEVEL.get(0));
- Set<String> level1Files = new HashSet<>(SST_FILES_BY_LEVEL.get(1));
- Set<String> level2Files = new HashSet<>(SST_FILES_BY_LEVEL.get(2));
- Set<String> level3Files = new HashSet<>(SST_FILES_BY_LEVEL.get(3));
-
- level1Files.addAll(level0Files);
- level2Files.addAll(level1Files);
- level3Files.addAll(level2Files);
-
- return Stream.of(
- Arguments.of("Remove level 0 from forward DAG",
- createForwardDagFromLevelNodes(4, 0),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(0)),
- createForwardDagFromLevelNodes(4, 1),
- level0Files
- ),
- Arguments.of("Remove level 1 from forward DAG",
- createForwardDagFromLevelNodes(4, 0),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(1)),
- createForwardDagFromLevelNodes(4, 2),
- level1Files
- ),
- Arguments.of("Remove level 2 from forward DAG",
- createForwardDagFromLevelNodes(4, 0),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(2)),
- createForwardDagFromLevelNodes(4, 3),
- level2Files
- ),
- Arguments.of("Remove level 3 from forward DAG",
- createForwardDagFromLevelNodes(4, 0),
- new HashSet<>(COMPACTION_NODES_BY_LEVEL.get(3)),
- createForwardDagFromLevelNodes(4, 4),
- level3Files
- )
- );
- }
-
- @ParameterizedTest(name = "{0}")
- @MethodSource("pruneForwardDagScenarios")
- public void testPruneForwardDag(String description,
- MutableGraph<CompactionNode> originalDag,
- Set<CompactionNode> levelToBeRemoved,
- MutableGraph<CompactionNode> expectedDag,
- Set<String> expectedFileNodesRemoved) {
- Set<String> actualFileNodesRemoved =
- rocksDBCheckpointDiffer.pruneForwardDag(originalDag, levelToBeRemoved);
- assertEquals(expectedDag, originalDag);
- assertEquals(actualFileNodesRemoved, expectedFileNodesRemoved);
- }
-
- @SuppressWarnings("methodlength")
- private static Stream<Arguments> compactionDagPruningScenarios() {
- long currentTimeMillis = System.currentTimeMillis();
-
- String compactionLogFile0 = "S 1000 snapshotId0 " +
- (currentTimeMillis - MINUTES.toMillis(30)) + " \n";
- String compactionLogFile1 = "C 1500 000015,000013,000011,000009:000018," +
- "000016,000017\n"
- + "S 2000 snapshotId1 " +
- (currentTimeMillis - MINUTES.toMillis(24)) + " \n";
-
- String compactionLogFile2 = "C 2500 000018,000016,000017,000026,000024," +
- "000022,000020:000027,000030,000028,000031,000029\n"
- + "S 3000 snapshotId2 " +
- (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
-
- String compactionLogFile3 = "C 3500 000027,000030,000028,000031,000029," +
- "000039,000037,000035,000033:000040,000044,000042,000043,000046," +
- "000041,000045\n"
- + "S 4000 snapshotId3 " +
- (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
-
- String compactionLogFile4 = "C 4500 000040,000044,000042,000043,000046," +
- "000041,000045,000054,000052,000050,000048:000059,000055,000056," +
- "000060,000057,000058\n"
- + "S 5000 snapshotId4 " +
- (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
-
- String compactionLogFileWithoutSnapshot1 = "C 1500 000015,000013,000011," +
- "000009:000018,000016,000017\n" +
- "C 2000 000018,000016,000017,000026,000024,000022,000020" +
- ":000027,000030,000028,000031,000029\n";
-
- String compactionLogFileWithoutSnapshot2 = "C 4500 000040,000044,000042," +
- "000043,000046,000041,000045,000054,000052,000050,000048:000059," +
- "000055,000056,000060,000057,000058\n";
-
- String compactionLogFileOnlyWithSnapshot1 =
- "S 3000 snapshotIdWithoutCompaction1 " +
- (currentTimeMillis - MINUTES.toMillis(18)) + " \n";
-
- String compactionLogFileOnlyWithSnapshot2 =
- "S 3000 snapshotIdWithoutCompaction2 " +
- (currentTimeMillis - MINUTES.toMillis(15)) + " \n";
-
- String compactionLogFileOnlyWithSnapshot3 =
- "S 3000 snapshotIdWithoutCompaction3 " +
- (currentTimeMillis - MINUTES.toMillis(12)) + " \n";
-
- String compactionLogFileOnlyWithSnapshot4 =
- "S 3000 snapshotIdWithoutCompaction4 " +
- (currentTimeMillis - MINUTES.toMillis(9)) + " \n";
-
- String compactionLogFileOnlyWithSnapshot5 =
- "S 3000 snapshotIdWithoutCompaction5 " +
- (currentTimeMillis - MINUTES.toMillis(6)) + " \n";
-
- String compactionLogFileOnlyWithSnapshot6 =
- "S 3000 snapshotIdWithoutCompaction6 " +
- (currentTimeMillis - MINUTES.toMillis(3)) + " \n";
-
- Set<String> expectedNodes = ImmutableSet.of("000059", "000055", "000056",
- "000060", "000057", "000058");
-
- return Stream.of(
- Arguments.of("Each compaction log file has only one snapshot and one" +
- " compaction statement except first log file.",
- Arrays.asList(compactionLogFile0, compactionLogFile1,
- compactionLogFile2, compactionLogFile3, compactionLogFile4),
- null,
- expectedNodes,
- 4,
- 0
- ),
- Arguments.of("Compaction log doesn't have snapshot because OM" +
- " restarted. Restart happened before snapshot to be deleted.",
- Arrays.asList(compactionLogFile0,
- compactionLogFileWithoutSnapshot1,
- compactionLogFile3,
- compactionLogFile4),
- null,
- expectedNodes,
- 4,
- 0
- ),
- Arguments.of("Compaction log doesn't have snapshot because OM" +
- " restarted. Restart happened after snapshot to be deleted.",
- Arrays.asList(compactionLogFile0, compactionLogFile1,
- compactionLogFile2, compactionLogFile3,
- compactionLogFileWithoutSnapshot2,
- compactionLogFileOnlyWithSnapshot4),
- null,
- expectedNodes,
- 4,
- 0
- ),
- Arguments.of("No compaction happened in between two snapshots.",
- Arrays.asList(compactionLogFile0, compactionLogFile1,
- compactionLogFile2, compactionLogFile3,
- compactionLogFileOnlyWithSnapshot1,
- compactionLogFileOnlyWithSnapshot2, compactionLogFile4),
- null,
- expectedNodes,
- 4,
- 0
- ),
- Arguments.of("Only contains snapshots but no compaction.",
- Arrays.asList(compactionLogFileOnlyWithSnapshot1,
- compactionLogFileOnlyWithSnapshot2,
- compactionLogFileOnlyWithSnapshot3,
- compactionLogFileOnlyWithSnapshot4,
- compactionLogFileOnlyWithSnapshot5,
- compactionLogFileOnlyWithSnapshot6),
- null,
- Collections.emptySet(),
- 0,
- 0
- ),
- Arguments.of("No file exists because compaction has not happened" +
- " and snapshot is not taken.",
- Collections.emptyList(),
- null,
- Collections.emptySet(),
- 0,
- 0
- ),
- Arguments.of("When compaction table is used case 1.",
- null,
- asList(createCompactionEntry(1500,
- (currentTimeMillis - MINUTES.toMillis(24)),
- asList("000015", "000013", "000011", "000009"),
- asList("000018", "000016", "000017")),
- createCompactionEntry(2500,
- (currentTimeMillis - MINUTES.toMillis(20)),
- asList("000018", "000016", "000017", "000026", "000024",
- "000022", "000020"),
- asList("000027", "000030", "000028", "000031", "000029")),
- createCompactionEntry(3500,
- (currentTimeMillis - MINUTES.toMillis(16)),
- asList("000027", "000030", "000028", "000031", "000029",
- "000039", "000037", "000035", "000033"),
- asList("000040", "000044", "000042", "000043", "000046",
- "000041", "000045")),
- createCompactionEntry(4500,
- (currentTimeMillis - MINUTES.toMillis(12)),
- asList("000040", "000044", "000042", "000043", "000046",
- "000041", "000045", "000054", "000052", "000050",
- "000048"),
- asList("000059", "000055", "000056", "000060", "000057",
- "000058"))),
- expectedNodes,
- 4,
- 0
- ),
- Arguments.of("When compaction table is used case 2.",
- null,
- asList(createCompactionEntry(1500,
- (currentTimeMillis - MINUTES.toMillis(24)),
- asList("000015", "000013", "000011", "000009"),
- asList("000018", "000016", "000017")),
- createCompactionEntry(2500,
- (currentTimeMillis - MINUTES.toMillis(18)),
- asList("000018", "000016", "000017", "000026", "000024",
- "000022", "000020"),
- asList("000027", "000030", "000028", "000031", "000029")),
- createCompactionEntry(3500,
- (currentTimeMillis - MINUTES.toMillis(12)),
- asList("000027", "000030", "000028", "000031", "000029",
- "000039", "000037", "000035", "000033"),
- asList("000040", "000044", "000042", "000043", "000046",
- "000041", "000045")),
- createCompactionEntry(4500,
- (currentTimeMillis - MINUTES.toMillis(6)),
- asList("000040", "000044", "000042", "000043", "000046",
- "000041", "000045", "000054", "000052", "000050",
- "000048"),
- asList("000059", "000055", "000056", "000060", "000057",
- "000058"))),
- ImmutableSet.of("000059", "000055", "000056", "000060", "000057",
- "000058", "000040", "000044", "000042", "000043", "000046",
- "000041", "000045", "000054", "000052", "000050", "000048"),
- 4,
- 1
- )
- );
- }
-
- /**
- * End-to-end test for snapshot's compaction history pruning.
- */
- @ParameterizedTest(name = "{0}")
- @MethodSource("compactionDagPruningScenarios")
- public void testPruneOlderSnapshotsWithCompactionHistory(
- String description,
- List<String> compactionLogs,
- List<CompactionLogEntry> compactionLogEntries,
- Set<String> expectedNodes,
- int expectedNumberOfLogEntriesBeforePruning,
- int expectedNumberOfLogEntriesAfterPruning
- ) throws IOException, ExecutionException, InterruptedException,
- TimeoutException {
- List<File> filesCreated = new ArrayList<>();
-
- if (compactionLogs != null) {
- for (int i = 0; i < compactionLogs.size(); i++) {
- String compactionFileName = METADATA_DIR_NAME + "/" +
COMPACTION_LOG_DIR_NAME
- + "/0000" + i + COMPACTION_LOG_FILE_NAME_SUFFIX;
- File compactionFile = new File(compactionFileName);
- Files.write(compactionFile.toPath(),
- compactionLogs.get(i).getBytes(UTF_8));
- filesCreated.add(compactionFile);
- }
- } else if (compactionLogEntries != null) {
- compactionLogEntries.forEach(entry ->
- rocksDBCheckpointDiffer.addToCompactionLogTable(entry));
- } else {
- throw new IllegalArgumentException("One of compactionLog or" +
- " compactionLogEntries should be present.");
- }
-
- rocksDBCheckpointDiffer.loadAllCompactionLogs();
- assertEquals(expectedNumberOfLogEntriesBeforePruning,
- countEntriesInCompactionLogTable());
- waitForLock(rocksDBCheckpointDiffer,
- RocksDBCheckpointDiffer::pruneOlderSnapshotsWithCompactionHistory);
-
- Set<String> actualNodesInForwardDAG = rocksDBCheckpointDiffer
- .getForwardCompactionDAG()
- .nodes()
- .stream()
- .map(CompactionNode::getFileName)
- .collect(Collectors.toSet());
-
- Set<String> actualNodesBackwardDAG = rocksDBCheckpointDiffer
- .getBackwardCompactionDAG()
- .nodes()
- .stream()
- .map(CompactionNode::getFileName)
- .collect(Collectors.toSet());
-
- assertEquals(expectedNodes, actualNodesInForwardDAG);
- assertEquals(expectedNodes, actualNodesBackwardDAG);
-
- for (int i = 0; compactionLogs != null && i < compactionLogs.size(); i++) {
- File compactionFile = filesCreated.get(i);
- assertFalse(compactionFile.exists());
- }
-
- assertEquals(expectedNumberOfLogEntriesAfterPruning,
- countEntriesInCompactionLogTable());
- }
-
- private int countEntriesInCompactionLogTable() {
- try (ManagedRocksIterator iterator = new ManagedRocksIterator(
- activeRocksDB.get().newIterator(compactionLogTableCFHandle))) {
- iterator.get().seekToFirst();
- int count = 0;
- while (iterator.get().isValid()) {
- iterator.get().next();
- count++;
- }
- return count;
- }
- }
-
// Take the lock, confirm that the consumer doesn't finish
// then release the lock and confirm that the consumer does finish.
private void waitForLock(RocksDBCheckpointDiffer differ,
- Consumer<RocksDBCheckpointDiffer> c)
+ Consumer<RocksDBCheckpointDiffer> c)
throws InterruptedException, ExecutionException, TimeoutException {
Future<Boolean> future;
// Take the lock and start the consumer.
try (BootstrapStateHandler.Lock lock =
- differ.getBootstrapStateLock().lock()) {
+ differ.getBootstrapStateLock().lock()) {
future = executorService.submit(
() -> {
c.accept(differ);
@@ -1779,7 +1297,7 @@ private static Stream<Arguments>
sstFilePruningScenarios() {
);
}
- private static CompactionLogEntry createCompactionEntry(long
dbSequenceNumber,
+ static CompactionLogEntry createCompactionEntry(long dbSequenceNumber,
long compactionTime,
List<String>
inputFiles,
List<String>
outputFiles) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]