This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 5627c01b1ef [fix][meta] Fix ZooKeeper session reconnect race condition 
in PulsarZooKeeperClient.clientCreator (#25910)
5627c01b1ef is described below

commit 5627c01b1ef04b8424b781eabc4dea6963faf847
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Jun 3 05:14:48 2026 +0800

    [fix][meta] Fix ZooKeeper session reconnect race condition in 
PulsarZooKeeperClient.clientCreator (#25910)
---
 .../metadata/impl/PulsarZooKeeperClient.java       | 40 +++++++++++++++++++---
 .../pulsar/metadata/impl/ZKSessionWatcher.java     | 21 +++++++++++-
 2 files changed, 55 insertions(+), 6 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index 2b39e1bf7ca..22ec0ba62fc 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -122,9 +123,34 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
                         log.info().attr("connectString", 
connectString).log("Reconnecting zookeeper");
                         // close the previous one
                         closeZkHandle();
+
+                        // ZooKeeper can deliver SyncConnected after 
createZooKeeper() returns but before zk.set(newZk)
+                        // publishes the new instance. Hold these events until 
the new instance is published, so child
+                        // watchers never observe a new-session event while 
PulsarZooKeeperClient still points at the
+                        // old handle.
+                        CountDownLatch newZkSetLatch = new CountDownLatch(1);
+                        Watcher forwardEventsWatcher = event -> {
+                            try {
+                                boolean awaited = 
newZkSetLatch.await(sessionTimeoutMs, TimeUnit.MILLISECONDS);
+                                if (!awaited) {
+                                    log.warn().attr("event", event)
+                                            .log("Timed out waiting for 
ZooKeeper instance to be published before "
+                                                    + "forwarding event");
+                                }
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                                log.warn()
+                                        .attr("event", event)
+                                        .exception(e)
+                                        .log("Interrupted while waiting for 
ZooKeeper instance to be published");
+                                return;
+                            }
+                            watcherManager.process(event);
+                        };
+
                         ZooKeeper newZk;
                         try {
-                            newZk = createZooKeeper();
+                            newZk = createZooKeeper(forwardEventsWatcher);
                         } catch (IOException | 
QuorumPeerConfig.ConfigException e) {
                             log.error()
                                     .attr("connectString", connectString)
@@ -133,8 +159,12 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
                                     .log("Failed to create zookeeper 
instance");
                             throw 
KeeperException.create(KeeperException.Code.CONNECTIONLOSS);
                         }
-                        waitForConnection();
+
+                        // Publish the new instance before releasing the 
forwarding watcher. waitForConnection() must
+                        // happen after countDown(), since it depends on the 
forwarded SyncConnected event.
                         zk.set(newZk);
+                        newZkSetLatch.countDown();
+                        waitForConnection();
                         log.info()
                                 .attr("sessionId", 
Long.toHexString(newZk.getSessionId()))
                                 .attr("connectString", connectString)
@@ -363,12 +393,12 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
     }
 
     @SuppressWarnings("deprecation")
-    protected ZooKeeper createZooKeeper() throws IOException, 
QuorumPeerConfig.ConfigException {
+    protected ZooKeeper createZooKeeper(Watcher watcher) throws IOException, 
QuorumPeerConfig.ConfigException {
         if (null != configPath) {
-            return new ZooKeeper(connectString, sessionTimeoutMs, 
watcherManager, allowReadOnlyMode,
+            return new ZooKeeper(connectString, sessionTimeoutMs, watcher, 
allowReadOnlyMode,
                     new ZKClientConfig(configPath));
         }
-        return new ZooKeeper(connectString, sessionTimeoutMs, watcherManager, 
allowReadOnlyMode);
+        return new ZooKeeper(connectString, sessionTimeoutMs, watcher, 
allowReadOnlyMode);
     }
 
     @Override
diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
index e5726865d2c..08d87eb94b2 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/ZKSessionWatcher.java
@@ -87,6 +87,7 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
     // in the future.
     private void checkConnectionStatus() {
         try {
+            long checkedSessionId = zk.getSessionId();
             CompletableFuture<Watcher.Event.KeeperState> future = new 
CompletableFuture<>();
             zk.exists("/", false, (StatCallback) (rc, path, ctx, stat) -> {
                 switch (KeeperException.Code.get(rc)) {
@@ -112,7 +113,7 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
                 zkClientState = Watcher.Event.KeeperState.Disconnected;
             }
 
-            checkState(zkClientState);
+            checkStateIfSameSession(checkedSessionId, zkClientState);
         } catch (RejectedExecutionException | InterruptedException e) {
             task.cancel(true);
         } catch (Throwable t) {
@@ -130,6 +131,24 @@ public class ZKSessionWatcher implements AutoCloseable, 
Watcher {
         currentStatus = SessionEvent.SessionLost;
     }
 
+    // PulsarZooKeeperClient publishes the new ZooKeeper instance before 
forwarding the corresponding session event to
+    // watcherManager, so zk.set(newZk) happens-before this watcher observes 
the new-session event. Keep the session-id
+    // check and state transition in the same synchronized section to prevent 
stale async probes from racing with that
+    // event and overwriting the state of the newly established session.
+    private synchronized void checkStateIfSameSession(long checkedSessionId,
+                                                      
Watcher.Event.KeeperState zkClientState) {
+        long currentSessionId = zk.getSessionId();
+        if (checkedSessionId != currentSessionId) {
+            log.warn()
+                    .attr("checkedSessionId", checkedSessionId)
+                    .attr("currentSessionId", currentSessionId)
+                    .attr("zkClientState", zkClientState)
+                    .log("Ignoring ZooKeeper session state from a stale 
session");
+            return;
+        }
+        checkState(zkClientState);
+    }
+
     private synchronized void checkState(Watcher.Event.KeeperState 
zkClientState) {
         switch (zkClientState) {
         case Expired:

Reply via email to