This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b64445effbf [feat](fe) add recovery_journal_id to truncate bdbje logs 
(#60738)
b64445effbf is described below

commit b64445effbfd2c06ae9df0ba4ff56d0e045e758a
Author: walter <[email protected]>
AuthorDate: Sat Feb 28 11:38:56 2026 +0800

    [feat](fe) add recovery_journal_id to truncate bdbje logs (#60738)
    
    It only works for metadata_failure_recovery mode
    
    Problem Summary:
    When using metadata failure recovery mode, sometimes the journal logs
    need to be truncated to a specific journal ID to recover from metadata
    corruption. This PR adds a new parameter `--recovery_journal_id` to
    allow users to specify the target journal ID, and all journals with IDs
    greater than this value will be removed.
    
    ### How to use
    
    When the Frontend (FE) metadata is corrupted and needs to be recovered,
    follow these steps:
    
    1. Stop the FE process
    2. Start FE with both `--metadata_failure_recovery` and
    `--recovery_journal_id` parameters:
    
    ```bash
    ./start_fe.sh --metadata_failure_recovery --recovery_journal_id <journal_id>
    ```
    
    For example, to recover to journal ID 12345:
    ```bash
    ./start_fe.sh --metadata_failure_recovery --recovery_journal_id 12345
    ```
    
    3. The system will:
    - Remove all journal databases with IDs greater than the specified
    journal ID
    - Truncate the target journal database by deleting keys with IDs greater
    than the specified value
       - Start FE in metadata failure recovery mode
    
    **Note**: This operation will permanently delete metadata journals. Make
    sure to backup your data before using this feature.
---
 bin/start_fe.sh                                    | 12 ++-
 .../src/main/java/org/apache/doris/DorisFE.java    | 12 +++
 .../java/org/apache/doris/common/FeConstants.java  |  1 +
 .../apache/doris/journal/bdbje/BDBEnvironment.java | 92 ++++++++++++++++++++++
 .../apache/doris/journal/bdbje/BDBJEJournal.java   | 75 ++++++++++++++++++
 .../doris/journal/bdbje/BDBEnvironmentTest.java    | 88 ++++++++++++++++++---
 .../doris/journal/bdbje/BDBJEJournalTest.java      | 87 ++++++++++++++++++++
 7 files changed, 353 insertions(+), 14 deletions(-)

diff --git a/bin/start_fe.sh b/bin/start_fe.sh
index aedea1ad791..edf84001b80 100755
--- a/bin/start_fe.sh
+++ b/bin/start_fe.sh
@@ -33,6 +33,7 @@ OPTS="$(getopt \
     -l 'image:' \
     -l 'version' \
     -l 'metadata_failure_recovery' \
+    -l 'recovery_journal_id:' \
     -l 'console' \
     -l 'cluster_snapshot:' \
     -- "$@")"
@@ -46,6 +47,7 @@ IMAGE_PATH=''
 IMAGE_TOOL=''
 OPT_VERSION=''
 METADATA_FAILURE_RECOVERY=''
+RECOVERY_JOURNAL_ID=''
 CLUSTER_SNAPSHOT=''
 while true; do
     case "$1" in
@@ -65,6 +67,10 @@ while true; do
         METADATA_FAILURE_RECOVERY="-r"
         shift
         ;;
+    --recovery_journal_id)
+        RECOVERY_JOURNAL_ID="--recovery_journal_id $2"
+        shift 2
+        ;;
     --helper)
         HELPER="$2"
         shift 2
@@ -418,12 +424,12 @@ if [[ "${IMAGE_TOOL}" -eq 1 ]]; then
         echo "Internal error, USE IMAGE_TOOL like: ./start_fe.sh --image 
image_path"
     fi
 elif [[ "${RUN_DAEMON}" -eq 1 ]]; then
-    nohup ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} 
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" 
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} 
"${METADATA_FAILURE_RECOVERY}" "${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 
2>&1 </dev/null &
+    nohup ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} 
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" 
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} 
"${METADATA_FAILURE_RECOVERY}" "${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}}" 
"${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null &
 elif [[ "${RUN_CONSOLE}" -eq 1 ]]; then
     export DORIS_LOG_TO_STDERR=1
-    ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} 
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" 
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} 
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" 
"${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" </dev/null
+    ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} 
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" 
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} 
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" 
"${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}}" "${CLUSTER_SNAPSHOT}" "$@" 
>>"${STDOUT_LOGGER}" </dev/null
 else
-    ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} 
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" 
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} 
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" 
"${CLUSTER_SNAPSHOT}" "$@" >>"${STDOUT_LOGGER}" 2>&1 </dev/null
+    ${LIMIT:+${LIMIT}} "${JAVA}" ${final_java_opt:+${final_java_opt}} 
-XX:-OmitStackTraceInFastThrow -XX:OnOutOfMemoryError="kill -9 %p" 
${coverage_opt:+${coverage_opt}} org.apache.doris.DorisFE ${HELPER:+${HELPER}} 
${OPT_VERSION:+${OPT_VERSION}} "${METADATA_FAILURE_RECOVERY}" 
"${RECOVERY_JOURNAL_ID:+${RECOVERY_JOURNAL_ID}}" "${CLUSTER_SNAPSHOT}" "$@" 
>>"${STDOUT_LOGGER}" 2>&1 </dev/null
 fi
 
 if [[ "${OPT_VERSION}" != "" ]]; then
diff --git a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java 
b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
index 282f3300ee8..afdf4c69e87 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/DorisFE.java
@@ -51,6 +51,7 @@ import io.netty.util.internal.logging.Log4JLoggerFactory;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.DefaultParser;
+import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 import org.apache.logging.log4j.LogManager;
@@ -351,6 +352,9 @@ public class DorisFE {
         options.addOption("m", "metaversion", true, "Specify the meta version 
to decode log value");
         options.addOption("r", FeConstants.METADATA_FAILURE_RECOVERY_KEY, 
false,
                 "Check if the specified metadata recover is valid");
+        
options.addOption(Option.builder().longOpt(FeConstants.RECOVERY_JOURNAL_ID_KEY).hasArg()
+                .desc("Specify the recovery truncate journal id, and journals 
greater than this id will be removed")
+                .build());
         options.addOption("c", "cluster_snapshot", true, "Specify the cluster 
snapshot json file");
 
         CommandLine cmd = null;
@@ -388,6 +392,14 @@ public class DorisFE {
         if (cmd.hasOption('r') || 
cmd.hasOption(FeConstants.METADATA_FAILURE_RECOVERY_KEY)) {
             System.setProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY, 
"true");
         }
+        if (cmd.hasOption(FeConstants.RECOVERY_JOURNAL_ID_KEY)) {
+            String recoveryJournalId = 
cmd.getOptionValue(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+            if (Strings.isNullOrEmpty(recoveryJournalId)) {
+                System.err.println("recovery_journal_id is missing");
+                System.exit(-1);
+            }
+            System.setProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY, 
recoveryJournalId.trim());
+        }
         if (cmd.hasOption('b') || cmd.hasOption("bdb")) {
             if (cmd.hasOption('l') || cmd.hasOption("listdb")) {
                 // list bdb je databases
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 491a8e036af..dc2e2aafbdc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -67,4 +67,5 @@ public class FeConstants {
     public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = 
"cloud_cache_hotspot";
 
     public static String METADATA_FAILURE_RECOVERY_KEY = 
"metadata_failure_recovery";
+    public static String RECOVERY_JOURNAL_ID_KEY = "recovery_journal_id";
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
index 1dc965a603e..d2caabdec27 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBEnvironment.java
@@ -26,8 +26,11 @@ import org.apache.doris.ha.HAProtocol;
 import org.apache.doris.system.Frontend;
 
 import com.google.common.collect.ImmutableList;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.Cursor;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.DatabaseNotFoundException;
 import com.sleepycat.je.Durability;
@@ -35,6 +38,8 @@ import com.sleepycat.je.Durability.ReplicaAckPolicy;
 import com.sleepycat.je.Durability.SyncPolicy;
 import com.sleepycat.je.EnvironmentConfig;
 import com.sleepycat.je.EnvironmentFailureException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.rep.InsufficientLogException;
 import com.sleepycat.je.rep.NetworkRestore;
 import com.sleepycat.je.rep.NetworkRestoreConfig;
@@ -335,6 +340,93 @@ public class BDBEnvironment {
         }
     }
 
+    // Remove journals whose id is greater than truncateToJournalId.
+    public void truncateJournalsGreaterThan(long truncateToJournalId) {
+        lock.writeLock().lock();
+        try {
+            List<Long> dbNames = getDatabaseNames();
+            if (dbNames == null || dbNames.isEmpty()) {
+                return;
+            }
+
+            Long minJournalId = dbNames.get(0);
+            Long targetDbName = null;
+            for (Long dbName : dbNames) {
+                if (dbName <= truncateToJournalId) {
+                    targetDbName = dbName;
+                } else {
+                    break;
+                }
+            }
+
+            if (targetDbName == null) {
+                throw new IllegalArgumentException("truncate journal id " + 
truncateToJournalId
+                        + " is smaller than min journal id " + minJournalId);
+            }
+
+            for (Long dbName : dbNames) {
+                if (dbName > truncateToJournalId) {
+                    removeDatabase(dbName.toString());
+                }
+            }
+
+            long deletedCount = truncateTailInDb(targetDbName.toString(), 
truncateToJournalId);
+            LOG.info("truncate journals greater than {} finished, targetDb {}, 
deleted {} keys in target db",
+                    truncateToJournalId, targetDbName, deletedCount);
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
+
+    private long truncateTailInDb(String dbName, long truncateToJournalId) {
+        Database db = openDatabase(dbName);
+        if (db == null) {
+            throw new IllegalStateException("failed to open target database " 
+ dbName + " for truncate");
+        }
+
+        long deletedCount = 0;
+        TupleBinding<Long> idBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
+        com.sleepycat.je.Transaction txn = null;
+        Cursor cursor = null;
+        try {
+            txn = replicatedEnvironment.beginTransaction(null, null);
+            cursor = db.openCursor(txn, null);
+            DatabaseEntry key = new DatabaseEntry();
+            DatabaseEntry value = new DatabaseEntry();
+            while (cursor.getNext(key, value, LockMode.DEFAULT) == 
OperationStatus.SUCCESS) {
+                long journalId = idBinding.entryToObject(key);
+                if (journalId > truncateToJournalId) {
+                    cursor.delete();
+                    deletedCount++;
+                }
+            }
+            // Close cursor before committing transaction
+            cursor.close();
+            cursor = null;
+            txn.commit();
+            txn = null;
+        } catch (Exception e) {
+            if (cursor != null) {
+                try {
+                    cursor.close();
+                } catch (Exception cursorCloseEx) {
+                    LOG.warn("failed to close cursor", cursorCloseEx);
+                }
+            }
+            if (txn != null) {
+                try {
+                    txn.abort();
+                } catch (Exception abortEx) {
+                    LOG.warn("failed to abort transaction", abortEx);
+                }
+            }
+            LOG.warn("failed to truncate database {} to journal id {}", 
dbName, truncateToJournalId, e);
+            throw new IllegalStateException("failed to truncate database " + 
dbName, e);
+        }
+
+        return deletedCount;
+    }
+
     // get journal db names and sort the names
     public List<Long> getDatabaseNames() {
         // The operation before may set the current thread as interrupted.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 46c4c5f5517..f4234cc462a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -74,6 +74,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE 
IGNORE THIS LINE: B
     public static final Logger LOG = LogManager.getLogger(BDBJEJournal.class);
     private static final int OUTPUT_BUFFER_INIT_SIZE = 128;
     private static final int RETRY_TIME = 3;
+    private static final long RECOVERY_JOURNAL_ID_UNSET = -1L;
 
     private String environmentPath = null;
     private String selfNodeName;
@@ -519,6 +520,8 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                 LOG.error("catch an exception when setup bdb environment. will 
exit.", e);
                 System.exit(-1);
             }
+
+            truncateRecoveryJournalsIfNeeded(metadataFailureRecovery);
         }
 
         // Open a new journal database or get last existing one as current 
journal
@@ -593,6 +596,78 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                 NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), 
helperNode.getPort()));
     }
 
+    private void truncateRecoveryJournalsIfNeeded(boolean 
metadataFailureRecovery) {
+        if (!metadataFailureRecovery) {
+            return;
+        }
+
+        long recoveryJournalId = getRecoveryJournalIdOrUnset();
+        if (recoveryJournalId == RECOVERY_JOURNAL_ID_UNSET) {
+            return;
+        }
+
+        long maxJournalId = getMaxJournalIdWithoutCheck();
+        if (maxJournalId < 0) {
+            String msg = String.format("invalid metadata recovery truncate 
target %d, no journals in bdb",
+                    recoveryJournalId);
+            LOG.error(msg);
+            LogUtils.stderr(msg);
+            System.exit(-1);
+        }
+
+        if (recoveryJournalId >= maxJournalId) {
+            String msg = String.format("metadata recovery truncate target %d 
>= max journal id %d, no-op",
+                    recoveryJournalId, maxJournalId);
+            LOG.info(msg);
+            LogUtils.stdout(msg);
+            return;
+        }
+
+        long minJournalId = getMinJournalId();
+        if (minJournalId < 0 || recoveryJournalId < minJournalId) {
+            String msg = String.format("invalid metadata recovery truncate 
target %d, min journal id is %d",
+                    recoveryJournalId, minJournalId);
+            LOG.error(msg);
+            LogUtils.stderr(msg);
+            System.exit(-1);
+        }
+
+        try {
+            bdbEnvironment.truncateJournalsGreaterThan(recoveryJournalId);
+        } catch (Exception e) {
+            String msg = String.format("failed to truncate journals greater 
than %d in metadata recovery mode",
+                    recoveryJournalId);
+            LOG.error(msg, e);
+            LogUtils.stderr(msg + ", reason: " + e.getMessage());
+            System.exit(-1);
+        }
+        String msg = String.format("metadata recovery truncate finished, kept 
journals <= %d", recoveryJournalId);
+        LOG.info(msg);
+        LogUtils.stdout(msg);
+    }
+
+    private long getRecoveryJournalIdOrUnset() {
+        String journalIdStr = 
System.getProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+        if (journalIdStr == null || journalIdStr.trim().isEmpty()) {
+            return RECOVERY_JOURNAL_ID_UNSET;
+        }
+
+        String trimmedJournalId = journalIdStr.trim();
+        try {
+            long journalId = Long.parseLong(trimmedJournalId);
+            if (journalId < 0) {
+                throw new NumberFormatException("recovery_journal_id must not 
be negative");
+            }
+            return journalId;
+        } catch (NumberFormatException e) {
+            String msg = String.format("invalid recovery_journal_id: %s", 
trimmedJournalId);
+            LOG.error(msg, e);
+            LogUtils.stderr(msg);
+            System.exit(-1);
+        }
+        return RECOVERY_JOURNAL_ID_UNSET;
+    }
+
     @Override
     public long getJournalNum() {
         return currentJournalDB.count();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
index 68f01981a9a..b93766b3c9d 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
@@ -26,6 +26,7 @@ import org.apache.doris.system.Frontend;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
+import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.Durability;
@@ -124,6 +125,13 @@ public class BDBEnvironmentTest {
         return byteArray;
     }
 
+    private static DatabaseEntry longToEntry(long value) {
+        DatabaseEntry key = new DatabaseEntry();
+        TupleBinding<Long> idBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
+        idBinding.objectToEntry(value, key);
+        return key;
+    }
+
     // @Test
     @RepeatedTest(1)
     public void testSetup() throws Exception {
@@ -145,7 +153,7 @@ public class BDBEnvironmentTest {
         Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key, 
value));
 
         DatabaseEntry readValue = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, 
readValue, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, 
readValue, LockMode.DEFAULT));
         Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
 
         // Remove database
@@ -166,7 +174,7 @@ public class BDBEnvironmentTest {
         Database epochDb = bdbEnvironment.getEpochDB();
         Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.put(null, 
key, value));
         DatabaseEntry readValue2 = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.get(null, 
key, readValue2, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, epochDb.get(null, 
key, readValue2, LockMode.DEFAULT));
         Assertions.assertEquals(new String(value.getData()), new 
String(readValue2.getData()));
 
         new MockUp<Env>() {
@@ -236,7 +244,7 @@ public class BDBEnvironmentTest {
         Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key, 
value));
 
         DatabaseEntry readValue = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, 
readValue, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, 
readValue, LockMode.DEFAULT));
         Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
         bdbEnvironment.close();
 
@@ -246,7 +254,7 @@ public class BDBEnvironmentTest {
         Database db2 = bdbEnvironment2.openDatabase(dbName);
 
         DatabaseEntry readValue2 = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key, 
readValue2, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key, 
readValue2, LockMode.DEFAULT));
         Assertions.assertEquals(new String(value.getData()), new 
String(readValue2.getData()));
         bdbEnvironment2.close();
     }
@@ -270,7 +278,7 @@ public class BDBEnvironmentTest {
         Assertions.assertEquals(OperationStatus.SUCCESS, db.put(null, key, 
value));
 
         DatabaseEntry readValue = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, 
readValue, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, db.get(null, key, 
readValue, LockMode.DEFAULT));
         Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
         bdbEnvironment.close();
 
@@ -278,11 +286,69 @@ public class BDBEnvironmentTest {
         bdbEnvironment.openReplicatedEnvironment(homeFile);
         Database db2 = bdbEnvironment.openDatabase(dbName);
         DatabaseEntry readValue2 = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key, 
readValue2, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, db2.get(null, key, 
readValue2, LockMode.DEFAULT));
         Assertions.assertEquals(new String(value.getData()), new 
String(readValue2.getData()));
         bdbEnvironment.close();
     }
 
+    @RepeatedTest(1)
+    public void testTruncateJournalsGreaterThan() throws Exception {
+        int port = findValidPort();
+        String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+        String selfNodeHostPort = "127.0.0.1:" + port;
+
+        File homeFile = new File(createTmpDir());
+        BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+        bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort, 
selfNodeHostPort);
+
+        Database db1 = bdbEnvironment.openDatabase("1");
+        Database db11 = bdbEnvironment.openDatabase("11");
+        Database db21 = bdbEnvironment.openDatabase("21");
+        for (long i = 1; i <= 10; i++) {
+            Assertions.assertEquals(OperationStatus.SUCCESS, db1.put(null, 
longToEntry(i), new DatabaseEntry(randomBytes())));
+        }
+        for (long i = 11; i <= 20; i++) {
+            Assertions.assertEquals(OperationStatus.SUCCESS, db11.put(null, 
longToEntry(i), new DatabaseEntry(randomBytes())));
+        }
+        for (long i = 21; i <= 30; i++) {
+            Assertions.assertEquals(OperationStatus.SUCCESS, db21.put(null, 
longToEntry(i), new DatabaseEntry(randomBytes())));
+        }
+
+        bdbEnvironment.truncateJournalsGreaterThan(17);
+
+        List<Long> dbNames = bdbEnvironment.getDatabaseNames();
+        Assertions.assertEquals(2, dbNames.size());
+        Assertions.assertEquals(1L, dbNames.get(0));
+        Assertions.assertEquals(11L, dbNames.get(1));
+
+        Database db11AfterTruncate = bdbEnvironment.openDatabase("11");
+        Assertions.assertEquals(7, db11AfterTruncate.count());
+        Assertions.assertEquals(OperationStatus.NOTFOUND,
+                db11AfterTruncate.get(null, longToEntry(18), new 
DatabaseEntry(), LockMode.DEFAULT));
+        Assertions.assertEquals(OperationStatus.SUCCESS,
+                db11AfterTruncate.get(null, longToEntry(17), new 
DatabaseEntry(), LockMode.DEFAULT));
+        bdbEnvironment.close();
+    }
+
+    @RepeatedTest(1)
+    public void testTruncateJournalsGreaterThanInvalidBound() throws Exception 
{
+        int port = findValidPort();
+        String selfNodeName = Env.genFeNodeName("127.0.0.1", port, false);
+        String selfNodeHostPort = "127.0.0.1:" + port;
+
+        File homeFile = new File(createTmpDir());
+        BDBEnvironment bdbEnvironment = new BDBEnvironment(true, false);
+        bdbEnvironment.setup(homeFile, selfNodeName, selfNodeHostPort, 
selfNodeHostPort);
+
+        Database db1 = bdbEnvironment.openDatabase("1");
+        Assertions.assertEquals(OperationStatus.SUCCESS, db1.put(null, 
longToEntry(1), new DatabaseEntry(randomBytes())));
+
+        IllegalArgumentException exception = 
Assertions.assertThrows(IllegalArgumentException.class,
+                () -> bdbEnvironment.truncateJournalsGreaterThan(0));
+        Assertions.assertTrue(exception.getMessage().contains("smaller than 
min journal id"));
+        bdbEnvironment.close();
+    }
+
     /**
      * Test build a BDBEnvironment cluster (1 master + 2 follower + 1 observer)
      * @throws Exception
@@ -339,14 +405,14 @@ public class BDBEnvironmentTest {
             Assertions.assertEquals(1, 
followerEnvironment.getDatabaseNames().size());
             Database followerDb = followerEnvironment.openDatabase(dbName);
             DatabaseEntry readValue = new DatabaseEntry();
-            Assertions.assertEquals(OperationStatus.SUCCESS, 
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+            Assertions.assertEquals(OperationStatus.SUCCESS, 
followerDb.get(null, key, readValue, LockMode.DEFAULT));
             Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
         }
 
         Assertions.assertEquals(1, 
observerEnvironment.getDatabaseNames().size());
         Database observerDb = observerEnvironment.openDatabase(dbName);
         DatabaseEntry readValue = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, observerDb.get(null, 
key, readValue, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, observerDb.get(null, 
key, readValue, LockMode.DEFAULT));
         Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
 
         observerEnvironment.close();
@@ -453,7 +519,7 @@ public class BDBEnvironmentTest {
             Assertions.assertEquals(1, 
entryPair.first.getDatabaseNames().size());
             Database followerDb = entryPair.first.openDatabase(beginDbName);
             DatabaseEntry readValue = new DatabaseEntry();
-            Assertions.assertEquals(OperationStatus.SUCCESS, 
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+            Assertions.assertEquals(OperationStatus.SUCCESS, 
followerDb.get(null, key, readValue, LockMode.DEFAULT));
             Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
             followerDb.close();
         }
@@ -622,7 +688,7 @@ public class BDBEnvironmentTest {
             Assertions.assertEquals(1, 
entryPair.first.getDatabaseNames().size());
             Database followerDb = entryPair.first.openDatabase(beginDbName);
             DatabaseEntry readValue = new DatabaseEntry();
-            Assertions.assertEquals(OperationStatus.SUCCESS, 
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+            Assertions.assertEquals(OperationStatus.SUCCESS, 
followerDb.get(null, key, readValue, LockMode.DEFAULT));
             Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
         }
 
@@ -671,6 +737,6 @@ public class BDBEnvironmentTest {
 
         key = new DatabaseEntry(new byte[]{1, 2, 3});
         DatabaseEntry readValue = new DatabaseEntry();
-        Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, 
key, readValue, LockMode.READ_COMMITTED));
+        Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, 
key, readValue, LockMode.DEFAULT));
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
index 3b2c404e8e7..bb263a426df 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBJEJournalTest.java
@@ -18,6 +18,7 @@
 package org.apache.doris.journal.bdbje;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
@@ -226,6 +227,92 @@ public class BDBJEJournalTest { // CHECKSTYLE IGNORE THIS 
LINE: BDBJE should use
         journal.close();
     }
 
+    @RepeatedTest(1)
+    public void testRecoveryJournalIdNoEffectWithoutMetadataRecovery() throws 
Exception {
+        int port = findValidPort();
+        Preconditions.checkArgument(((port > 0) && (port < 65535)));
+        String nodeName = Env.genFeNodeName("127.0.0.1", port, false);
+        long replayedJournalId = 0;
+        File tmpDir = createTmpDir();
+        new MockUp<Env>() {
+            HostInfo selfNode = new HostInfo("127.0.0.1", port);
+            @Mock
+            public String getBdbDir() {
+                return tmpDir.getAbsolutePath();
+            }
+
+            @Mock
+            public HostInfo getSelfNode() {
+                return this.selfNode;
+            }
+
+            @Mock
+            public HostInfo getHelperNode() {
+                return this.selfNode;
+            }
+
+            @Mock
+            public boolean isElectable() {
+                return true;
+            }
+
+            @Mock
+            public long getReplayedJournalId() {
+                return replayedJournalId;
+            }
+        };
+
+        String oldRecovery = 
System.getProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+        String oldMetadataRecovery = 
System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+        BDBJEJournal journal = new BDBJEJournal(nodeName);
+        try {
+            System.clearProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+            System.setProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY, "5");
+
+            journal.open();
+            for (int i = 0; i < 10; i++) {
+                if 
(journal.getBDBEnvironment().getReplicatedEnvironment().getState()
+                        .equals(ReplicatedEnvironment.State.MASTER)) {
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+            Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
+                    
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
+            for (int i = 0; i < 10; i++) {
+                journal.write(OperationType.OP_TIMESTAMP, new Timestamp());
+            }
+            Assertions.assertEquals(10, journal.getMaxJournalId());
+            journal.close();
+
+            journal.open();
+            for (int i = 0; i < 10; i++) {
+                if 
(journal.getBDBEnvironment().getReplicatedEnvironment().getState()
+                        .equals(ReplicatedEnvironment.State.MASTER)) {
+                    break;
+                }
+                Thread.sleep(1000);
+            }
+            Assertions.assertEquals(ReplicatedEnvironment.State.MASTER,
+                    
journal.getBDBEnvironment().getReplicatedEnvironment().getState());
+            Assertions.assertEquals(10, journal.getMaxJournalId());
+        } finally {
+            if (journal.getBDBEnvironment() != null) {
+                journal.close();
+            }
+            if (oldRecovery == null) {
+                System.clearProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY);
+            } else {
+                System.setProperty(FeConstants.RECOVERY_JOURNAL_ID_KEY, 
oldRecovery);
+            }
+            if (oldMetadataRecovery == null) {
+                
System.clearProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
+            } else {
+                System.setProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY, 
oldMetadataRecovery);
+            }
+        }
+    }
+
     @RepeatedTest(1)
     public void testJournalBatch() throws Exception {
         int port = findValidPort();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to