This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ea832744df3 [fix](journal) ensure txns are matched with the master 
before replaying (#28192)
ea832744df3 is described below

commit ea832744df3a243793cb881cf3b4344590bc54f7
Author: walter <w41te...@gmail.com>
AuthorDate: Wed Dec 13 18:14:51 2023 +0800

    [fix](journal) ensure txns are matched with the master before replaying 
(#28192)
---
 .../apache/doris/journal/bdbje/BDBJEJournal.java   |  72 +++++++++++--
 .../doris/journal/bdbje/BDBEnvironmentTest.java    | 111 +++++++++++++++++++--
 2 files changed, 167 insertions(+), 16 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
index 1781d8a56d6..ebdbadae192 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java
@@ -38,12 +38,16 @@ import com.sleepycat.je.DatabaseNotFoundException;
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.StatsConfig;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
 import com.sleepycat.je.rep.InsufficientLogException;
 import com.sleepycat.je.rep.NetworkRestore;
 import com.sleepycat.je.rep.NetworkRestoreConfig;
+import com.sleepycat.je.rep.ReplicaConsistencyException;
 import com.sleepycat.je.rep.ReplicaWriteException;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
 import com.sleepycat.je.rep.RollbackException;
+import com.sleepycat.je.rep.TimeConsistencyPolicy;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
@@ -52,6 +56,7 @@ import java.io.DataInputStream;
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 /*
@@ -124,10 +129,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
 
         // id is the key
         long id = nextJournalId.getAndIncrement();
-        Long idLong = id;
-        DatabaseEntry theKey = new DatabaseEntry();
-        TupleBinding<Long> idBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
-        idBinding.objectToEntry(idLong, theKey);
+        DatabaseEntry theKey = idToKey(id);
 
         // entity is the value
         DataOutputBuffer buffer = new 
DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE);
@@ -203,6 +205,13 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         return id;
     }
 
+    private static DatabaseEntry idToKey(Long id) {
+        DatabaseEntry theKey = new DatabaseEntry();
+        TupleBinding<Long> idBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
+        idBinding.objectToEntry(id, theKey);
+        return theKey;
+    }
+
     @Override
     public JournalEntity read(long journalId) {
         List<Long> dbNames = getDatabaseNames();
@@ -224,7 +233,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         }
 
         JournalEntity ret = null;
-        Long key = new Long(journalId);
+        Long key = journalId;
         DatabaseEntry theKey = new DatabaseEntry();
         TupleBinding<Long> myBinding = 
TupleBinding.getPrimitiveBinding(Long.class);
         myBinding.objectToEntry(key, theKey);
@@ -270,7 +279,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         if (dbNames == null) {
             return ret;
         }
-        if (dbNames.size() == 0) {
+        if (dbNames.isEmpty()) {
             return ret;
         }
 
@@ -278,9 +287,52 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         String dbName = dbNames.get(index).toString();
         long dbNumberName = dbNames.get(index);
         Database database = bdbEnvironment.openDatabase(dbName);
-        ret = dbNumberName + database.count() - 1;
+        if (!isReplicaTxnAreMatched(database, dbNumberName)) {
+            LOG.warn("The current replica hasn't synced up with the master, 
current db name: {}", dbNumberName);
+            if (index != 0) {
+                // Because roll journal occurs after write, the previous write 
must have
+                // been replicated to the majority, so it can be guaranteed 
that the database
+                // will not be rollback.
+                return dbNumberName - 1;
+            }
+            return -1;
+        }
+        return dbNumberName + database.count() - 1;
+    }
 
-        return ret;
+    // Whether the replica txns are matched with the master.
+    //
+    // BDBJE could throw InsufficientAcksException during post commit, at that 
time the
+    // log has persisted in disk. When the replica is restarted, we need to 
ensure that
+    // before replaying the journals, sync up txns with the new master in the 
cluster and
+    // rollback the txns that have been persisted but have not committed to 
the majority.
+    //
+    // See 
org.apache.doris.journal.bdbje.BDBEnvironmentTest#testReadTxnIsNotMatched for 
details.
+    private boolean isReplicaTxnAreMatched(Database database, Long id) {
+        // The time lag is set to Integer.MAX_VALUE if the replica haven't 
synced up
+        // with the master. By allowing a very large lag, we can detect 
whether the
+        // replica has synced up with the master.
+        TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy(
+                1, TimeUnit.DAYS, 1, TimeUnit.MINUTES);
+        Transaction txn = null;
+        try {
+            TransactionConfig cfg = new TransactionConfig()
+                    .setReadOnly(true)
+                    .setReadCommitted(true)
+                    .setConsistencyPolicy(consistencyPolicy);
+
+            txn = 
bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, cfg);
+
+            DatabaseEntry key = idToKey(id);
+            database.get(txn, key, null, LockMode.READ_COMMITTED);
+            return true;
+        } catch (ReplicaConsistencyException e) {
+            return false;
+        } finally {
+            if (txn != null) {
+                txn.abort();
+            }
+        }
     }
 
     @Override
@@ -293,7 +345,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
         if (dbNames == null) {
             return ret;
         }
-        if (dbNames.size() == 0) {
+        if (dbNames.isEmpty()) {
             return ret;
         }
 
@@ -350,7 +402,7 @@ public class BDBJEJournal implements Journal { // 
CHECKSTYLE IGNORE THIS LINE: B
                     LOG.error("fail to get dbNames while open bdbje journal. 
will exit");
                     System.exit(-1);
                 }
-                if (dbNames.size() == 0) {
+                if (dbNames.isEmpty()) {
                     /*
                      * This is the very first time to open. Usually, we will 
open a new database
                      * named "1".
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
index 09b10b604b8..f61fbc6bf98 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java
@@ -31,8 +31,11 @@ import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.Durability;
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.rep.InsufficientAcksException;
 import com.sleepycat.je.rep.ReplicatedEnvironment;
+import com.sleepycat.je.rep.impl.RepImpl;
 import com.sleepycat.je.rep.util.ReplicationGroupAdmin;
+import mockit.Expectations;
 import mockit.Mock;
 import mockit.MockUp;
 import org.apache.commons.io.FileUtils;
@@ -46,6 +49,7 @@ import org.junit.jupiter.api.RepeatedTest;
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.DatagramSocket;
 import java.net.ServerSocket;
 import java.net.SocketException;
@@ -367,15 +371,16 @@ public class BDBEnvironmentTest {
         for (int i = 0; i < 10; i++) {
             electionSuccess = true;
             for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
-                if (entryPair.first.getReplicatedEnvironment().getState()
-                        .equals(ReplicatedEnvironment.State.MASTER)) {
+                ReplicatedEnvironment env = 
entryPair.first.getReplicatedEnvironment();
+                if (env == null) {
+                    continue;
+                }
+                if (env.getState().equals(ReplicatedEnvironment.State.MASTER)) 
{
                     masterEnvironment = entryPair.first;
                     masterNode = entryPair.second;
                 }
-                if (!entryPair.first.getReplicatedEnvironment().getState()
-                        .equals(ReplicatedEnvironment.State.MASTER)
-                        && 
!entryPair.first.getReplicatedEnvironment().getState()
-                        .equals(ReplicatedEnvironment.State.REPLICA)) {
+                if (!env.getState().equals(ReplicatedEnvironment.State.MASTER)
+                        && 
!env.getState().equals(ReplicatedEnvironment.State.REPLICA)) {
                     electionSuccess = false;
                 }
             }
@@ -559,4 +564,98 @@ public class BDBEnvironmentTest {
         Assertions.assertEquals(Durability.ReplicaAckPolicy.SIMPLE_MAJORITY,
                 Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy", 
"default"));
     }
+
+    @RepeatedTest(1)
+    public void testReadTxnIsNotMatched() throws Exception {
+        List<Pair<BDBEnvironment, NodeInfo>> followersInfo = new ArrayList<>();
+
+        int masterPort = findValidPort();
+        String masterNodeName = "fe1";
+        String masterNodeHostPort = "127.0.0.1:" + masterPort;
+
+        BDBEnvironment masterEnvironment = new BDBEnvironment(true, false);
+        String masterDir = createTmpDir();
+        masterEnvironment.setup(new File(masterDir), masterNodeName, 
masterNodeHostPort, masterNodeHostPort);
+        followersInfo.add(Pair.of(masterEnvironment, new 
NodeInfo(masterNodeName, masterNodeHostPort, masterDir)));
+
+        for (int i = 2; i <= 3; i++) {
+            int nodePort = findValidPort();
+            String nodeName = "fe" + i;
+            String nodeHostPort = "127.0.0.1:" + nodePort;
+
+            BDBEnvironment followerEnvironment = new BDBEnvironment(true, 
false);
+            String nodeDir = createTmpDir();
+            followerEnvironment.setup(new File(nodeDir), nodeName, 
nodeHostPort, masterNodeHostPort);
+            followersInfo.add(Pair.of(followerEnvironment, new 
NodeInfo(nodeName, nodeHostPort, nodeDir)));
+        }
+
+        Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo);
+        String beginDbName = String.valueOf(0L);
+        Database masterDb = masterPair.first.openDatabase(beginDbName);
+        DatabaseEntry key = new DatabaseEntry(randomBytes());
+        DatabaseEntry value = new DatabaseEntry(randomBytes());
+        Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, 
key, value));
+        Assertions.assertEquals(1, 
masterEnvironment.getDatabaseNames().size());
+        LOG.info("master is {} | {}", masterPair.second.name, 
masterPair.second.dir);
+
+        for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+            if (entryPair.second.dir.equals(masterPair.second.dir)) {
+                LOG.info("skip {}", entryPair.second.name);
+                continue;
+            }
+
+            Assertions.assertEquals(1, 
entryPair.first.getDatabaseNames().size());
+            Database followerDb = entryPair.first.openDatabase(beginDbName);
+            DatabaseEntry readValue = new DatabaseEntry();
+            Assertions.assertEquals(OperationStatus.SUCCESS, 
followerDb.get(null, key, readValue, LockMode.READ_COMMITTED));
+            Assertions.assertEquals(new String(value.getData()), new 
String(readValue.getData()));
+        }
+
+        Field envImplField = 
ReplicatedEnvironment.class.getDeclaredField("repEnvironmentImpl");
+        envImplField.setAccessible(true);
+        RepImpl impl = (RepImpl) 
envImplField.get(masterPair.first.getReplicatedEnvironment());
+        Assertions.assertNotNull(impl);
+
+        new Expectations(impl) {{
+                // Below method will replicate log item to followers.
+                impl.registerVLSN(withNotNull());
+                // Below method will wait until the logs are replicated.
+                impl.postLogCommitHook(withNotNull(), withNotNull());
+                result = new InsufficientAcksException("mocked");
+            }};
+
+        long count = masterDb.count();
+        final Database oldMasterDb = masterDb;
+        Assertions.assertThrows(InsufficientAcksException.class, () -> {
+            // Since this key is not replicated to any replicas, it should not 
be read.
+            DatabaseEntry k = new DatabaseEntry(new byte[]{1, 2, 3});
+            DatabaseEntry v = new DatabaseEntry(new byte[]{4, 5, 6});
+            oldMasterDb.put(null, k, v);
+        });
+
+        LOG.info("close old master {} | {}", masterPair.second.name, 
masterPair.second.dir);
+        masterDb.close();
+        masterEnvironment.getEpochDB().close();
+        masterEnvironment.close();
+
+        for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) {
+            if (entryPair.second.dir.equals(masterPair.second.dir)) {
+                LOG.info("skip {}", entryPair.second.name);
+                continue;
+            }
+            LOG.info("close follower {} | {}", entryPair.second.name, 
entryPair.second.dir);
+            entryPair.first.close();
+        }
+
+        masterPair.first.openReplicatedEnvironment(new 
File(masterPair.second.dir));
+        masterDb = masterPair.first.openDatabase(beginDbName);
+        LOG.info("open {} | {}", masterPair.second.name, 
masterPair.second.dir);
+
+        // The local commit txn is readable!!!
+        Assertions.assertEquals(count + 1, masterDb.count());
+
+        key = new DatabaseEntry(new byte[]{1, 2, 3});
+        DatabaseEntry readValue = new DatabaseEntry();
+        Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, 
key, readValue, LockMode.READ_COMMITTED));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to