This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cfdb73f9a2296ce5976b16082d299007cbf09329
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Jun 3 02:47:24 2026 +0800

    [fix][meta] Fix PulsarZooKeeperClient async addWatch callback retry 
behavior (#25913)
    
    (cherry picked from commit be9f97ac0f833f2dc74dc0f4538e647f7376461f)
---
 .../metadata/impl/PulsarZooKeeperClient.java       | 10 ++--
 .../apache/pulsar/metadata/MetadataStoreTest.java  | 58 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 5 deletions(-)

diff --git 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index 6a995f20e74..462df69b2ea 100644
--- 
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++ 
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -1163,7 +1163,7 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
     }
 
     @Override
-    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, 
VoidCallback cb, Object ctx) {
+    public void addWatch(String basePath, Watcher watcher, AddWatchMode mode, 
VoidCallback cb, Object context) {
         final Runnable proc = new ZkRetryRunnable(operationRetryPolicy, 
rateLimiter, setStats) {
 
             final VoidCallback vCb = new VoidCallback() {
@@ -1174,7 +1174,7 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
                     if (allowRetry(worker, rc)) {
                         backOffAndRetry(that, worker.nextRetryWaitTime());
                     } else {
-                        vCb.processResult(rc, basePath, ctx);
+                        cb.processResult(rc, path, context);
                     }
                 }
 
@@ -1184,15 +1184,15 @@ public class PulsarZooKeeperClient extends ZooKeeper 
implements Watcher, AutoClo
             void zkRun() {
                 ZooKeeper zkHandle = zk.get();
                 if (null == zkHandle) {
-                    PulsarZooKeeperClient.super.addWatch(basePath, watcher, 
mode, cb, ctx);
+                    PulsarZooKeeperClient.super.addWatch(basePath, watcher, 
mode, vCb, worker);
                 } else {
-                    zkHandle.addWatch(basePath, watcher, mode, cb, ctx);
+                    zkHandle.addWatch(basePath, watcher, mode, vCb, worker);
                 }
             }
 
             @Override
             public String toString() {
-                return String.format("setData (%s, mode = %s)", basePath, 
mode.name());
+                return String.format("addWatch (%s, mode = %s)", basePath, 
mode.name());
             }
         };
         // execute it immediately
diff --git 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 259261b18c6..bae1eae71ef 100644
--- 
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++ 
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -19,6 +19,12 @@
 package org.apache.pulsar.metadata;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
@@ -40,6 +46,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
@@ -49,6 +56,7 @@ import java.util.function.Supplier;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.metadata.api.GetResult;
 import org.apache.pulsar.metadata.api.MetadataStore;
@@ -65,6 +73,9 @@ import org.apache.pulsar.metadata.impl.DualMetadataStore;
 import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
 import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
@@ -547,6 +558,53 @@ public class MetadataStoreTest extends 
BaseMetadataStoreTest {
         assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled());
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAsyncAddWatchRetriesWithWrapperCallback() throws Exception 
{
+        String path = newKey();
+        @Cleanup
+        PulsarZooKeeperClient zkClient = PulsarZooKeeperClient.newBuilder()
+                .connectString(zks.getConnectionString())
+                .sessionTimeoutMs(3000)
+                .operationRetryPolicy(new 
BoundExponentialBackoffRetryPolicy(0, 0, 3))
+                .build();
+
+        ZooKeeper mockZk = mock(ZooKeeper.class);
+        AtomicInteger attempts = new AtomicInteger();
+        doAnswer(invocation -> {
+            // The wrapper callback should consume this recoverable failure 
and retry the addWatch operation.
+            int rc = attempts.incrementAndGet() == 1
+                    ? KeeperException.Code.CONNECTIONLOSS.intValue()
+                    : KeeperException.Code.OK.intValue();
+            String callbackPath = invocation.getArgument(0);
+            VoidCallback callback = invocation.getArgument(3);
+            Object callbackContext = invocation.getArgument(4);
+            callback.processResult(rc, callbackPath, callbackContext);
+            return null;
+        }).when(mockZk).addWatch(eq(path), any(Watcher.class), 
eq(AddWatchMode.PERSISTENT_RECURSIVE),
+                any(VoidCallback.class), any());
+
+        // Force the Pulsar wrapper to delegate the async addWatch call to our 
controlled ZooKeeper instance.
+        var zooKeeperRef = (AtomicReference<ZooKeeper>) 
WhiteboxImpl.getInternalState(zkClient, "zk");
+        zooKeeperRef.set(mockZk);
+
+        CountDownLatch callbackCalled = new CountDownLatch(1);
+        AtomicInteger callbackRc = new AtomicInteger(Integer.MIN_VALUE);
+        zkClient.addWatch(path, event -> {
+        }, AddWatchMode.PERSISTENT_RECURSIVE, (rc, callbackPath, ctx) -> {
+            callbackRc.set(rc);
+            callbackCalled.countDown();
+        }, null);
+
+        assertTrue(callbackCalled.await(5, TimeUnit.SECONDS));
+
+        // The caller should only see the final successful result after the 
retry, not the first CONNECTIONLOSS.
+        assertEquals(callbackRc.get(), KeeperException.Code.OK.intValue());
+        assertEquals(attempts.get(), 2);
+        verify(mockZk, times(2)).addWatch(eq(path), any(Watcher.class), 
eq(AddWatchMode.PERSISTENT_RECURSIVE),
+                any(VoidCallback.class), any());
+    }
+
     @Test
     public void testOxiaLoadConfigFromFile() throws Exception {
         final String metadataStoreName = 
UUID.randomUUID().toString().replaceAll("-", "");

Reply via email to