This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b64445effbf [feat](fe) add recovery_journal_id to truncate bdbje logs
(#60738)
b64445effbf is described below
commit b64445effbfd2c06ae9df0ba4ff56d0e045e758a
Author: walter <[email protected]>
AuthorDate: Sat Feb 28 11:38:56 2026 +0800
[feat](fe) add recovery_journal_id to truncate bdbje logs (#60738)
It only works for metadata_failure_recovery mode
Problem Summary:
When using metadata failure recovery mode, sometimes the journal logs
need to be truncated to a specific journal ID to recover from metadata
corruption. This PR adds a new parameter `--recovery_journal_id` to
allow users to specify the target journal ID, and all journals with IDs
greater than this value will be removed.
### How to use
When the Frontend (FE) metadata is corrupted and needs to be recovered,
follow these steps:
1. Stop the FE process
2. Start FE with both `--metadata_failure_recovery` and
`--recovery_journal_id` parameters:
```bash
./start_fe.sh --metadata_failure_recovery --recovery_journal_id <journal_id>
```
For example, to recover to journal ID 12345:
```bash
./start_fe.sh --metadata_failure_recovery --recovery_journal_id 12345
```
3. The system will:
- Remove all journal databases with IDs greater than the specified
journal ID
- Truncate the target journal database by deleting keys with IDs greater
than the specified value
- Start FE in metadata failure recovery mode
**Note**: This operation will permanently delete metadata journals. Make
sure to backup your data before using this feature.
---
bin/start_fe.sh | 12 ++-
.../src/main/java/org/apache/doris/DorisFE.java | 12 +++
.../java/org/apache/doris/common/FeConstants.java | 1 +
.../apache/doris/journal/bdbje/BDBEnvironment.java | 92 ++++++++++++++++++++++
.../apache/doris/journal/bdbje/BDBJEJournal.java | 75 ++++++++++++++++++
.../doris/journal/bdbje/BDBEnvironmentTest.java | 88 ++++++++++++++++++---
.../doris/journal/bdbje/BDBJEJournalTest.java | 87 ++++++++++++++++++++
7 files changed, 353 insertions(+), 14 deletions(-)
diff --git a/bin/start_fe.sh b/bin/start_fe.sh
index aedea1ad791..edf84001b80 100755
--- a/bin/start_fe.sh
+++ b/bin/start_fe.sh
@@ -33,6 +33,7 @@ OPTS="$(getopt \
-l 'image:' \
-l 'version' \
-l 'metadata_failure_recovery' \
+ -l 'recovery_journal_id:' \
-l 'console' \
-l 'cluster_snapshot:' \
-- "$@")"
@@ -46,6 +47,7 @@ IMAGE_PATH=''
IMAGE_TOOL=''
OPT_VERSION=''
METADATA_FAILURE_RECOVERY=''
+RECOVERY_JOURNAL_ID=''
CLUSTER_SNAPSHOT=''
while true; do
case "$1" in
@@ -65,6 +67,10 @@ while true; do
METADATA_FAILURE_RECOVERY="-r"
shift
;;
+ --recovery_journal_id)
+ RECOVERY_JOURNAL_ID="--recovery_journal_id $2"
+ shift 2
+ ;;
--helper)
HELPER="$2"
shift 2
@@ -418,12 +424,12 @@ if [[ "${IMAGE_TOOL}" -eq 1 ]]; then
echo "Internal error, USE IMAGE_TOOL like: ./start_fe.sh --image
image_path"
fi
elif [[ "${RUN_DAEMON}" -eq 1 ]]; then
- nohup ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}}
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p"
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}}
"${METADATA_FAILURE_RECOVERY}" "${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}"
2>&1 </dev/null &
+ nohup ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}}
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p"
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}}
"${METADATA_FAILURE_RECOVERY}" "${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}}"
"${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null &
elif [[ "${RUN_CONSOLE}" -eq 1 ]]; then
export DORIS_LOG_TO_STDERR=1
- ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}}
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p"
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}}
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}"
"${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" </dev/null
+ ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}}
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p"
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}}
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}"
"${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}}" "${CLUSTER_SNAPSHOT}" "$@"
>>"${STDOUT_LOGGER}" </dev/null
else
- ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}}
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p"
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}}
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}"
"${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null
+ ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}}
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p"
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}}
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}"
"${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}}" "${CLUSTER_SNAPSHOT}" "$@"
>>"${STDOUT_LOGGER}" 2>&1 </dev/null
fi
if [[ "${OPT_VERSION}" != "" ]]; then
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index 282f3300ee8..afdf4c69e87 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -51,6 +51,7 @@ import io.netty.util.internal.logging.Log4JLoggerFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.logging.log4j.LogManager;
@@ -351,6 +352,9 @@ public class DorisFE {
options.addOption("m", "metaversion", true, "Specify the meta version
to decode log value");
options.addOption("r", FeConstants.METADATA_FAILURE_RECOVERY_KEY,
false,
"Check if the specified metadata recover is valid");
+
options.addOption(Option.builder().longOpt(FeConstants.RECOVERY_JOURNAL_ID_KEY).hasArg()
+ .desc("Specify the recovery truncate journal id, and journals
greater than this id will be removed")
+ .build());
options.addOption("c", "cluster_snapshot", true, "Specify the cluster
snapshot json file");
CommandLine cmd = null;
@@ -388,6 +392,14 @@ public class DorisFE {
if (cmd.hasOption('r') ||
cmd.hasOption(FeConstants.METADATA_FAILURE_RECOVERY_KEY)) {
System.setProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY,
"true");
}
+ if (cmd.hasOption(FeConstants.RECOVERY_JOURNAL_ID_KEY)) {
+ String recoveryJournalId =
cmd.getOptionValue(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+ if (Strings.isNullOrEmpty(recoveryJournalId)) {
+ System.err.println("recovery_journal_id is missing");
+ System.exit(-1);
+ }
+ System.setProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY,
recoveryJournalId.trim());
+ }
if (cmd.hasOption('b') || cmd.hasOption("bdb")) {
if (cmd.hasOption('l') || cmd.hasOption("listdb")) {
// list bdb je databases
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 491a8e036af..dc2e2aafbdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -67,4 +67,5 @@ public class FeConstants {
public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME =
"cloud_cache_hotspot";
public static String METADATA_FAILURE_RECOVERY_KEY =
"metadata_failure_recovery";
+ public static String RECOVERY_JOURNAL_ID_KEY = "recovery_journal_id";
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
index 1dc965a603e..d2caabdec27 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
@@ -26,8 +26,11 @@ import org.apache.doris.ha.HAProtocol;
import org.apache.doris.system.Frontend;
import com.google.common.collect.ImmutableList;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.Cursor;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DatabaseNotFoundException;
import com.sleepycat.je.Durability;
@@ -35,6 +38,8 @@ import com.sleepycat.je.Durability.ReplicaAckPolicy;
import com.sleepycat.je.Durability.SyncPolicy;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
@@ -335,6 +340,93 @@ public class BDBEnvironment {
}
}
+ // Remove journals whose id is greater than truncateToJournalId.
+ public void truncateJournalsGreaterThan(long truncateToJournalId) {
+ lock.writeLock().lock();
+ try {
+ List<Long> dbNames = getDatabaseNames();
+ if (dbNames == null || dbNames.isEmpty()) {
+ return;
+ }
+
+ Long minJournalId = dbNames.get(0);
+ Long targetDbName = null;
+ for (Long dbName : dbNames) {
+ if (dbName <= truncateToJournalId) {
+ targetDbName = dbName;
+ } else {
+ break;
+ }
+ }
+
+ if (targetDbName == null) {
+ throw new IllegalArgumentException("truncate journal id " +
truncateToJournalId
+ + " is smaller than min journal id " + minJournalId);
+ }
+
+ for (Long dbName : dbNames) {
+ if (dbName > truncateToJournalId) {
+ removeDatabase(dbName.toString());
+ }
+ }
+
+ long deletedCount = truncateTailInDb(targetDbName.toString(),
truncateToJournalId);
+ LOG.info("truncate journals greater than {} finished, targetDb {},
deleted {} keys in target db",
+ truncateToJournalId, targetDbName, deletedCount);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private long truncateTailInDb(String dbName, long truncateToJournalId) {
+ Database db = openDatabase(dbName);
+ if (db == null) {
+ throw new IllegalStateException("failed to open target database "
+ dbName + " for truncate");
+ }
+
+ long deletedCount = 0;
+ TupleBinding<Long> idBinding =
TupleBinding.getPrimitiveBinding(Long.class);
+ com.sleepycat.je.Transaction txn = null;
+ Cursor cursor = null;
+ try {
+ txn = replicatedEnvironment.beginTransaction(null, null);
+ cursor = db.openCursor(txn, null);
+ DatabaseEntry key = new DatabaseEntry();
+ DatabaseEntry value = new DatabaseEntry();
+ while (cursor.getNext(key, value, LockMode.DEFAULT) ==
OperationStatus.SUCCESS) {
+ long journalId = idBinding.entryToObject(key);
+ if (journalId > truncateToJournalId) {
+ cursor.delete();
+ deletedCount++;
+ }
+ }
+ // Close cursor before committing transaction
+ cursor.close();
+ cursor = null;
+ txn.commit();
+ txn = null;
+ } catch (Exception e) {
+ if (cursor != null) {
+ try {
+ cursor.close();
+ } catch (Exception cursorCloseEx) {
+ LOG.warn("failed to close cursor", cursorCloseEx);
+ }
+ }
+ if (txn != null) {
+ try {
+ txn.abort();
+ } catch (Exception abortEx) {
+ LOG.warn("failed to abort transaction", abortEx);
+ }
+ }
+ LOG.warn("failed to truncate database {} to journal id {}",
dbName, truncateToJournalId, e);
+ throw new IllegalStateException("failed to truncate database " +
dbName, e);
+ }
+
+ return deletedCount;
+ }
+
// get journal db names and sort the names
public List<Long> getDatabaseNames() {
// The operation before may set the current thread as interrupted.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 46c4c5f5517..f4234cc462a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -74,6 +74,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE
IGNORE THIS LINE: B
public static final Logger LOG = LogManager.getLogger(BDBJEJournal.class);
private static final int OUTPUT_BUFFER_INIT_SIZE = 128;
private static final int RETRY_TIME = 3;
+ private static final long RECOVERY_JOURNAL_ID_UNSET = -1L;
private String environmentPath = null;
private String selfNodeName;
@@ -519,6 +520,8 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
LOG.error("catch an exception when setup bdb environment. will
exit.", e);
System.exit(-1);
}
+
+ truncateRecoveryJournalsIfNeeded(metadataFailureRecovery);
}
// Open a new journal database or get last existing one as current
journal
@@ -593,6 +596,78 @@ public class BDBJEJournal implements Journal { //
CHECKSTYLE IGNORE THIS LINE: B
NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(),
helperNode.getPort()));
}
+ private void truncateRecoveryJournalsIfNeeded(boolean
metadataFailureRecovery) {
+ if (!metadataFailureRecovery) {
+ return;
+ }
+
+ long recoveryJournalId = getRecoveryJournalIdOrUnset();
+ if (recoveryJournalId == RECOVERY_JOURNAL_ID_UNSET) {
+ return;
+ }
+
+ long maxJournalId = getMaxJournalIdWithoutCheck();
+ if (maxJournalId < 0) {
+ String msg = String.format("invalid metadata recovery truncate
target %d, no journals in bdb",
+ recoveryJournalId);
+ LOG.error(msg);
+ LogUtils.stderr(msg);
+ System.exit(-1);
+ }
+
+ if (recoveryJournalId >= maxJournalId) {
+ String msg = String.format("metadata recovery truncate target %d
>= max journal id %d, no-op",
+ recoveryJournalId, maxJournalId);
+ LOG.info(msg);
+ LogUtils.stdout(msg);
+ return;
+ }
+
+ long minJournalId = getMinJournalId();
+ if (minJournalId < 0 || recoveryJournalId < minJournalId) {
+ String msg = String.format("invalid metadata recovery truncate
target %d, min journal id is %d",
+ recoveryJournalId, minJournalId);
+ LOG.error(msg);
+ LogUtils.stderr(msg);
+ System.exit(-1);
+ }
+
+ try {
+ bdbEnvironment.truncateJournalsGreaterThan(recoveryJournalId);
+ } catch (Exception e) {
+ String msg = String.format("failed to truncate journals greater
than %d in metadata recovery mode",
+ recoveryJournalId);
+ LOG.error(msg, e);
+ LogUtils.stderr(msg + ", reason: " + e.getMessage());
+ System.exit(-1);
+ }
+ String msg = String.format("metadata recovery truncate finished, kept
journals <= %d", recoveryJournalId);
+ LOG.info(msg);
+ LogUtils.stdout(msg);
+ }
+
+ private long getRecoveryJournalIdOrUnset() {
+ String journalIdStr =
System.getProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+ if (journalIdStr == null || journalIdStr.trim().isEmpty()) {
+ return RECOVERY_JOURNAL_ID_UNSET;
+ }
+
+ String trimmedJournalId = journalIdStr.trim();
+ try {
+ long journalId = Long.parseLong(trimmedJournalId);
+ if (journalId < 0) {
+ throw new NumberFormatException("recovery_journal_id must not
be negative");
+ }
+ return journalId;
+ } catch (NumberFormatException e) {
+ String msg = String.format("invalid recovery_journal_id: %s",
trimmedJournalId);
+ LOG.error(msg, e);
+ LogUtils.stderr(msg);
+ System.exit(-1);
+ }
+ return RECOVERY_JOURNAL_ID_UNSET;
+ }
+
@Override
public long getJournalNum() {
return currentJournalDB.count();
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
index 68f01981a9a..b93766b3c9d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.system.Frontend;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseEntry;
import com.sleepycat.je.Durability;
@@ -124,6 +125,13 @@ public class BDBEnvironmentTest {
return byteArray;
}
+ private static DatabaseEntry longToEntry(long value) {
+ DatabaseEntry key = new DatabaseEntry();
+ TupleBinding<Long> idBinding =
TupleBinding.getPrimitiveBinding(Long.class);
+ idBinding.objectToEntry(value, key);
+ return key;
+ }
+
// @Test
@RepeatedTest(1)
public void testSetup() throws Exception {
@@ -145,7 +153,7 @@ public class BDBEnvironmentTest {
Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key,
value));
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
// Remove database
@@ -166,7 +174,7 @@ public class BDBEnvironmentTest {
Database epochDb = bdbEnvironment.getEpochDB();
Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.put(null,
key, value));
DatabaseEntry readValue2 = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.get(null,
key, readValue2, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.get(null,
key, readValue2, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue2.getData()));
new MockUp<Env>() {
@@ -236,7 +244,7 @@ public class BDBEnvironmentTest {
Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key,
value));
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
bdbEnvironment.close();
@@ -246,7 +254,7 @@ public class BDBEnvironmentTest {
Database db2 = bdbEnvironment2.openDatabase(dbName);
DatabaseEntry readValue2 = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key,
readValue2, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key,
readValue2, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue2.getData()));
bdbEnvironment2.close();
}
@@ -270,7 +278,7 @@ public class BDBEnvironmentTest {
Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key,
value));
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key,
readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
bdbEnvironment.close();
@@ -278,11 +286,69 @@ public class BDBEnvironmentTest {
bdbEnvironment.openReplicatedEnvironment(homeFile);
Database db2 = bdbEnvironment.openDatabase(dbName);
DatabaseEntry readValue2 = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key,
readValue2, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key,
readValue2, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue2.getData()));
bdbEnvironment.close();
}
+ @RepeatedTest(1)
+ public void testTruncateJournalsGreaterThan() throws Exception {
+ int port = findValidPort();
+ String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ String selfNodeHostPort = "127.0.0.1:" + port;
+
+ File homeFile = new File(createTmpDir());
+ BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+ bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort,
selfNodeHostPort);
+
+ Database db1 = bdbEnvironment.openDatabase("1");
+ Database db11 = bdbEnvironment.openDatabase("11");
+ Database db21 = bdbEnvironment.openDatabase("21");
+ for (long i = 1; i <= 10; i++) {
+ Assertions.assertEquals(OperationStatus.SUCCESS, db1.put(null,
longToEntry(i), new DatabaseEntry(randomBytes())));
+ }
+ for (long i = 11; i <= 20; i++) {
+ Assertions.assertEquals(OperationStatus.SUCCESS, db11.put(null,
longToEntry(i), new DatabaseEntry(randomBytes())));
+ }
+ for (long i = 21; i <= 30; i++) {
+ Assertions.assertEquals(OperationStatus.SUCCESS, db21.put(null,
longToEntry(i), new DatabaseEntry(randomBytes())));
+ }
+
+ bdbEnvironment.truncateJournalsGreaterThan(17);
+
+ List<Long> dbNames = bdbEnvironment.getDatabaseNames();
+ Assertions.assertEquals(2, dbNames.size());
+ Assertions.assertEquals(1L, dbNames.get(0));
+ Assertions.assertEquals(11L, dbNames.get(1));
+
+ Database db11AfterTruncate = bdbEnvironment.openDatabase("11");
+ Assertions.assertEquals(7, db11AfterTruncate.count());
+ Assertions.assertEquals(OperationStatus.NOTFOUND,
+ db11AfterTruncate.get(null, longToEntry(18), new
DatabaseEntry(), LockMode.DEFAULT));
+ Assertions.assertEquals(OperationStatus.SUCCESS,
+ db11AfterTruncate.get(null, longToEntry(17), new
DatabaseEntry(), LockMode.DEFAULT));
+ bdbEnvironment.close();
+ }
+
+ @RepeatedTest(1)
+ public void testTruncateJournalsGreaterThanInvalidBound() throws Exception
{
+ int port = findValidPort();
+ String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ String selfNodeHostPort = "127.0.0.1:" + port;
+
+ File homeFile = new File(createTmpDir());
+ BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+ bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort,
selfNodeHostPort);
+
+ Database db1 = bdbEnvironment.openDatabase("1");
+ Assertions.assertEquals(OperationStatus.SUCCESS, db1.put(null,
longToEntry(1), new DatabaseEntry(randomBytes())));
+
+ IllegalArgumentException exception =
Assertions.assertThrows(IllegalArgumentException.class,
+ () -> bdbEnvironment.truncateJournalsGreaterThan(0));
+ Assertions.assertTrue(exception.getMessage().contains("smaller than
min journal id"));
+ bdbEnvironment.close();
+ }
+
/**
* Test build a BDBEnvironment cluster (1 master + 2 follower + 1 observer)
* @throws Exception
@@ -339,14 +405,14 @@ public class BDBEnvironmentTest {
Assertions.assertEquals(1,
followerEnvironment.getDatabaseNames().size());
Database followerDb = followerEnvironment.openDatabase(dbName);
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
}
Assertions.assertEquals(1,
observerEnvironment.getDatabaseNames().size());
Database observerDb = observerEnvironment.openDatabase(dbName);
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, observerDb.get(null,
key, readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, observerDb.get(null,
key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
observerEnvironment.close();
@@ -453,7 +519,7 @@ public class BDBEnvironmentTest {
Assertions.assertEquals(1,
entryPair.first.getDatabaseNames().size());
Database followerDb = entryPair.first.openDatabase(beginDbName);
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
followerDb.close();
}
@@ -622,7 +688,7 @@ public class BDBEnvironmentTest {
Assertions.assertEquals(1,
entryPair.first.getDatabaseNames().size());
Database followerDb = entryPair.first.openDatabase(beginDbName);
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS,
followerDb.get(null, key, readValue, LockMode.DEFAULT));
Assertions.assertEquals(new String(value.getData()), new
String(readValue.getData()));
}
@@ -671,6 +737,6 @@ public class BDBEnvironmentTest {
key = new DatabaseEntry(new byte[]{1, 2, 3});
DatabaseEntry readValue = new DatabaseEntry();
- Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null,
key, readValue, LockMode.READ_COMMITTED));
+ Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null,
key, readValue, LockMode.DEFAULT));
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
index 3b2c404e8e7..bb263a426df 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
@@ -18,6 +18,7 @@
package org.apache.doris.journal.bdbje;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
@@ -226,6 +227,92 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS
LINE: BDBJE should use
journal.close();
}
+ @RepeatedTest(1)
+ public void testRecoveryJournalIdNoEffectWithoutMetadataRecovery() throws
Exception {
+ int port = findValidPort();
+ Preconditions.checkArgument(((port > 0) && (port < 65535)));
+ String nodeName = Env.genFeNodeName("127.0.0.1", port, false);
+ long replayedJournalId = 0;
+ File tmpDir = createTmpDir();
+ new MockUp<Env>() {
+ HostInfo selfNode = new HostInfo("127.0.0.1", port);
+ @Mock
+ public String getBdbDir() {
+ return tmpDir.getAbsolutePath();
+ }
+
+ @Mock
+ public HostInfo getSelfNode() {
+ return this.selfNode;
+ }
+
+ @Mock
+ public HostInfo getHelperNode() {
+ return this.selfNode;
+ }
+
+ @Mock
+ public boolean isElectable() {
+ return true;
+ }
+
+ @Mock
+ public long getReplayedJournalId() {
+ return replayedJournalId;
+ }
+ };
+
+ String oldRecovery =
System.getProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+ String oldMetadataRecovery =
System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+ BDBJEJournal journal = new BDBJEJournal(nodeName);
+ try {
+ System.clearProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+ System.setProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY, "5");
+
+ journal.open();
+ for (int i = 0; i < 10; i++) {
+ if
(journal.getBDBEnvironment().getReplicatedEnvironment().getState()
+ .equals(ReplicatedEnvironment.State.MASTER)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
+
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
+ for (int i = 0; i < 10; i++) {
+ journal.write(OperationType.OP_TIMESTAMP, new Timestamp());
+ }
+ Assertions.assertEquals(10, journal.getMaxJournalId());
+ journal.close();
+
+ journal.open();
+ for (int i = 0; i < 10; i++) {
+ if
(journal.getBDBEnvironment().getReplicatedEnvironment().getState()
+ .equals(ReplicatedEnvironment.State.MASTER)) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+ Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
+
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
+ Assertions.assertEquals(10, journal.getMaxJournalId());
+ } finally {
+ if (journal.getBDBEnvironment() != null) {
+ journal.close();
+ }
+ if (oldRecovery == null) {
+ System.clearProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+ } else {
+ System.setProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY,
oldRecovery);
+ }
+ if (oldMetadataRecovery == null) {
+
System.clearProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+ } else {
+ System.setProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY,
oldMetadataRecovery);
+ }
+ }
+ }
+
@RepeatedTest(1)
public void testJournalBatch() throws Exception {
int port = findValidPort();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]