This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new ea832744df3 [fix](journal) ensure txns are matched with the master before replaying (#28192) ea832744df3 is described below commit ea832744df3a243793cb881cf3b4344590bc54f7 Author: walter <w41te...@gmail.com> AuthorDate: Wed Dec 13 18:14:51 2023 +0800 [fix](journal) ensure txns are matched with the master before replaying (#28192) --- .../apache/doris/journal/bdbje/BDBJEJournal.java | 72 +++++++++++-- .../doris/journal/bdbje/BDBEnvironmentTest.java | 111 +++++++++++++++++++-- 2 files changed, 167 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java index 1781d8a56d6..ebdbadae192 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java +++ b/fe/fe-core/src/main/java/org/apache/doris/journal/bdbje/BDBJEJournal.java @@ -38,12 +38,16 @@ import com.sleepycat.je.DatabaseNotFoundException; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; import com.sleepycat.je.StatsConfig; +import com.sleepycat.je.Transaction; +import com.sleepycat.je.TransactionConfig; import com.sleepycat.je.rep.InsufficientLogException; import com.sleepycat.je.rep.NetworkRestore; import com.sleepycat.je.rep.NetworkRestoreConfig; +import com.sleepycat.je.rep.ReplicaConsistencyException; import com.sleepycat.je.rep.ReplicaWriteException; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.RollbackException; +import com.sleepycat.je.rep.TimeConsistencyPolicy; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,6 +56,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.IOException; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; /* @@ -124,10 +129,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B // id is the key long id = nextJournalId.getAndIncrement(); - Long idLong = id; - DatabaseEntry theKey = new DatabaseEntry(); - TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class); - idBinding.objectToEntry(idLong, theKey); + DatabaseEntry theKey = idToKey(id); // entity is the value DataOutputBuffer buffer = new DataOutputBuffer(OUTPUT_BUFFER_INIT_SIZE); @@ -203,6 +205,13 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B return id; } + private static DatabaseEntry idToKey(Long id) { + DatabaseEntry theKey = new DatabaseEntry(); + TupleBinding<Long> idBinding = TupleBinding.getPrimitiveBinding(Long.class); + idBinding.objectToEntry(id, theKey); + return theKey; + } + @Override public JournalEntity read(long journalId) { List<Long> dbNames = getDatabaseNames(); @@ -224,7 +233,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B } JournalEntity ret = null; - Long key = new Long(journalId); + Long key = journalId; DatabaseEntry theKey = new DatabaseEntry(); TupleBinding<Long> myBinding = TupleBinding.getPrimitiveBinding(Long.class); myBinding.objectToEntry(key, theKey); @@ -270,7 +279,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B if (dbNames == null) { return ret; } - if (dbNames.size() == 0) { + if (dbNames.isEmpty()) { return ret; } @@ -278,9 +287,52 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B String dbName = dbNames.get(index).toString(); long dbNumberName = dbNames.get(index); Database database = bdbEnvironment.openDatabase(dbName); - ret = dbNumberName + database.count() - 1; + if (!isReplicaTxnAreMatched(database, dbNumberName)) { + LOG.warn("The current replica hasn't synced up with the master, current db name: {}", dbNumberName); + if (index != 0) { + // Because roll journal occurs after write, the previous write must have + // been replicated to the majority, so it can be guaranteed that the database + // will not be rollback. + return dbNumberName - 1; + } + return -1; + } + return dbNumberName + database.count() - 1; + } - return ret; + // Whether the replica txns are matched with the master. + // + // BDBJE could throw InsufficientAcksException during post commit, at that time the + // log has persisted in disk. When the replica is restarted, we need to ensure that + // before replaying the journals, sync up txns with the new master in the cluster and + // rollback the txns that have been persisted but have not committed to the majority. + // + // See org.apache.doris.journal.bdbje.BDBEnvironmentTest#testReadTxnIsNotMatched for details. + private boolean isReplicaTxnAreMatched(Database database, Long id) { + // The time lag is set to Integer.MAX_VALUE if the replica haven't synced up + // with the master. By allowing a very large lag, we can detect whether the + // replica has synced up with the master. + TimeConsistencyPolicy consistencyPolicy = new TimeConsistencyPolicy( + 1, TimeUnit.DAYS, 1, TimeUnit.MINUTES); + Transaction txn = null; + try { + TransactionConfig cfg = new TransactionConfig() + .setReadOnly(true) + .setReadCommitted(true) + .setConsistencyPolicy(consistencyPolicy); + + txn = bdbEnvironment.getReplicatedEnvironment().beginTransaction(null, cfg); + + DatabaseEntry key = idToKey(id); + database.get(txn, key, null, LockMode.READ_COMMITTED); + return true; + } catch (ReplicaConsistencyException e) { + return false; + } finally { + if (txn != null) { + txn.abort(); + } + } } @Override @@ -293,7 +345,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B if (dbNames == null) { return ret; } - if (dbNames.size() == 0) { + if (dbNames.isEmpty()) { return ret; } @@ -350,7 +402,7 @@ public class BDBJEJournal implements Journal { // CHECKSTYLE IGNORE THIS LINE: B LOG.error("fail to get dbNames while open bdbje journal. will exit"); System.exit(-1); } - if (dbNames.size() == 0) { + if (dbNames.isEmpty()) { /* * This is the very first time to open. Usually, we will open a new database * named "1". diff --git a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java index 09b10b604b8..f61fbc6bf98 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/journal/bdbje/BDBEnvironmentTest.java @@ -31,8 +31,11 @@ import com.sleepycat.je.DatabaseEntry; import com.sleepycat.je.Durability; import com.sleepycat.je.LockMode; import com.sleepycat.je.OperationStatus; +import com.sleepycat.je.rep.InsufficientAcksException; import com.sleepycat.je.rep.ReplicatedEnvironment; +import com.sleepycat.je.rep.impl.RepImpl; import com.sleepycat.je.rep.util.ReplicationGroupAdmin; +import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import org.apache.commons.io.FileUtils; @@ -46,6 +49,7 @@ import org.junit.jupiter.api.RepeatedTest; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.DatagramSocket; import java.net.ServerSocket; import java.net.SocketException; @@ -367,15 +371,16 @@ public class BDBEnvironmentTest { for (int i = 0; i < 10; i++) { electionSuccess = true; for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) { - if (entryPair.first.getReplicatedEnvironment().getState() - .equals(ReplicatedEnvironment.State.MASTER)) { + ReplicatedEnvironment env = entryPair.first.getReplicatedEnvironment(); + if (env == null) { + continue; + } + if (env.getState().equals(ReplicatedEnvironment.State.MASTER)) { masterEnvironment = entryPair.first; masterNode = entryPair.second; } - if (!entryPair.first.getReplicatedEnvironment().getState() - .equals(ReplicatedEnvironment.State.MASTER) - && !entryPair.first.getReplicatedEnvironment().getState() - .equals(ReplicatedEnvironment.State.REPLICA)) { + if (!env.getState().equals(ReplicatedEnvironment.State.MASTER) + && !env.getState().equals(ReplicatedEnvironment.State.REPLICA)) { electionSuccess = false; } } @@ -559,4 +564,98 @@ public class BDBEnvironmentTest { Assertions.assertEquals(Durability.ReplicaAckPolicy.SIMPLE_MAJORITY, Deencapsulation.invoke(BDBEnvironment.class, "getAckPolicy", "default")); } + + @RepeatedTest(1) + public void testReadTxnIsNotMatched() throws Exception { + List<Pair<BDBEnvironment, NodeInfo>> followersInfo = new ArrayList<>(); + + int masterPort = findValidPort(); + String masterNodeName = "fe1"; + String masterNodeHostPort = "127.0.0.1:" + masterPort; + + BDBEnvironment masterEnvironment = new BDBEnvironment(true, false); + String masterDir = createTmpDir(); + masterEnvironment.setup(new File(masterDir), masterNodeName, masterNodeHostPort, masterNodeHostPort); + followersInfo.add(Pair.of(masterEnvironment, new NodeInfo(masterNodeName, masterNodeHostPort, masterDir))); + + for (int i = 2; i <= 3; i++) { + int nodePort = findValidPort(); + String nodeName = "fe" + i; + String nodeHostPort = "127.0.0.1:" + nodePort; + + BDBEnvironment followerEnvironment = new BDBEnvironment(true, false); + String nodeDir = createTmpDir(); + followerEnvironment.setup(new File(nodeDir), nodeName, nodeHostPort, masterNodeHostPort); + followersInfo.add(Pair.of(followerEnvironment, new NodeInfo(nodeName, nodeHostPort, nodeDir))); + } + + Pair<BDBEnvironment, NodeInfo> masterPair = findMaster(followersInfo); + String beginDbName = String.valueOf(0L); + Database masterDb = masterPair.first.openDatabase(beginDbName); + DatabaseEntry key = new DatabaseEntry(randomBytes()); + DatabaseEntry value = new DatabaseEntry(randomBytes()); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.put(null, key, value)); + Assertions.assertEquals(1, masterEnvironment.getDatabaseNames().size()); + LOG.info("master is {} | {}", masterPair.second.name, masterPair.second.dir); + + for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip {}", entryPair.second.name); + continue; + } + + Assertions.assertEquals(1, entryPair.first.getDatabaseNames().size()); + Database followerDb = entryPair.first.openDatabase(beginDbName); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, followerDb.get(null, key, readValue, LockMode.READ_COMMITTED)); + Assertions.assertEquals(new String(value.getData()), new String(readValue.getData())); + } + + Field envImplField = ReplicatedEnvironment.class.getDeclaredField("repEnvironmentImpl"); + envImplField.setAccessible(true); + RepImpl impl = (RepImpl) envImplField.get(masterPair.first.getReplicatedEnvironment()); + Assertions.assertNotNull(impl); + + new Expectations(impl) {{ + // Below method will replicate log item to followers. + impl.registerVLSN(withNotNull()); + // Below method will wait until the logs are replicated. + impl.postLogCommitHook(withNotNull(), withNotNull()); + result = new InsufficientAcksException("mocked"); + }}; + + long count = masterDb.count(); + final Database oldMasterDb = masterDb; + Assertions.assertThrows(InsufficientAcksException.class, () -> { + // Since this key is not replicated to any replicas, it should not be read. + DatabaseEntry k = new DatabaseEntry(new byte[]{1, 2, 3}); + DatabaseEntry v = new DatabaseEntry(new byte[]{4, 5, 6}); + oldMasterDb.put(null, k, v); + }); + + LOG.info("close old master {} | {}", masterPair.second.name, masterPair.second.dir); + masterDb.close(); + masterEnvironment.getEpochDB().close(); + masterEnvironment.close(); + + for (Pair<BDBEnvironment, NodeInfo> entryPair : followersInfo) { + if (entryPair.second.dir.equals(masterPair.second.dir)) { + LOG.info("skip {}", entryPair.second.name); + continue; + } + LOG.info("close follower {} | {}", entryPair.second.name, entryPair.second.dir); + entryPair.first.close(); + } + + masterPair.first.openReplicatedEnvironment(new File(masterPair.second.dir)); + masterDb = masterPair.first.openDatabase(beginDbName); + LOG.info("open {} | {}", masterPair.second.name, masterPair.second.dir); + + // The local commit txn is readable!!! + Assertions.assertEquals(count + 1, masterDb.count()); + + key = new DatabaseEntry(new byte[]{1, 2, 3}); + DatabaseEntry readValue = new DatabaseEntry(); + Assertions.assertEquals(OperationStatus.SUCCESS, masterDb.get(null, key, readValue, LockMode.READ_COMMITTED)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org