kezhuw commented on code in PR #2152:
URL: https://github.com/apache/zookeeper/pull/2152#discussion_r1731040660
##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java:
##########
@@ -753,29 +752,60 @@ protected void syncWithLeader(long newLeaderZxid) throws
Exception {
isPreZAB1_0 = false;
// ZOOKEEPER-3911: make sure sync the uncommitted logs
before commit them (ACK NEWLEADER).
- sock.setSoTimeout(self.tickTime * self.syncLimit);
- self.setSyncMode(QuorumPeer.SyncMode.NONE);
- zk.startupWithoutServing();
- if (zk instanceof FollowerZooKeeperServer) {
+ if (zk instanceof FollowerZooKeeperServer &&
!packetsCommitted.isEmpty()) {
long startTime = Time.currentElapsedTime();
FollowerZooKeeperServer fzk =
(FollowerZooKeeperServer) zk;
- for (PacketInFlight p : packetsNotCommitted) {
- final Request request = fzk.appendRequest(p.hdr,
p.rec, p.digest);
- requestsToAck.add(request);
+
+ /*
+ * @see https://github.com/apache/zookeeper/pull/1848
+ * Persist and process the committed txns in
"packetsNotLogged"
+ * according to "packetsCommitted", which have been
committed by
+ * the leader. For these committed proposals, there is
no need to
+ * reply ack.
+ *
+ * @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4394
+ * Keep the outstanding proposals in
"packetsNotLogged" to avoid
+ * NullPointerException when the follower receives
COMMIT packet(s)
+ * right after replying NEWLEADER ack.
+ */
+ while (!packetsCommitted.isEmpty()) {
+ long zxid = packetsCommitted.removeFirst();
+ pif = packetsNotLogged.peekFirst();
+ if (pif == null) {
+ LOG.warn("Committing 0x{}, but got no
proposal", Long.toHexString(zxid));
+ continue;
+ } else if (pif.hdr.getZxid() != zxid) {
+ LOG.warn("Committing 0x{}, but next proposal
is 0x{}",
+ Long.toHexString(zxid),
Long.toHexString(pif.hdr.getZxid()));
+ continue;
+ }
+ packetsNotLogged.removeFirst();
+ fzk.appendRequest(pif.hdr, pif.rec, pif.digest);
+ fzk.processTxn(pif.hdr, pif.rec);
}
- // persist the txns to disk
+ // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4646
+ // Make sure to persist the txns to disk before
replying NEWLEADER ack.
fzk.getZKDatabase().commit();
- LOG.info("{} txns have been persisted and it took
{}ms",
- packetsNotCommitted.size(), Time.currentElapsedTime()
- startTime);
- packetsNotCommitted.clear();
+ LOG.info("It took {}ms to persist and commit txns in
packetsCommitted. "
+ + "{} outstanding txns left in
packetsNotLogged",
+ Time.currentElapsedTime() - startTime,
packetsNotLogged.size());
}
- // set the current epoch after all the tnxs are persisted
+ // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4643
+ // @see
https://issues.apache.org/jira/browse/ZOOKEEPER-4785
+ // Update current epoch after the committed txns are
persisted
self.setCurrentEpoch(newEpoch);
LOG.info("Set the current epoch to {}", newEpoch);
- // send NEWLEADER ack after all the tnxs are persisted
+ // Now we almost complete the synchronization phase. Start
RequestProcessors
+ // to asynchronously process the pending txns in
"packetsNotLogged" and
+ // "packetsCommitted" later.
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
+ self.setSyncMode(QuorumPeer.SyncMode.NONE);
+ zk.startupWithoutServing();
Review Comment:
I think we don't need this `startupWithoutServing` anymore. It was
introduced in #1445 to process asynchronous log process. And now we process log
synchronously.
This should solve problem @changruill raised in
https://github.com/apache/zookeeper/pull/2154#issuecomment-2167267712
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]