This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new be9f97ac0f8 [fix][meta] Fix PulsarZooKeeperClient async addWatch
callback retry behavior (#25913)
be9f97ac0f8 is described below
commit be9f97ac0f833f2dc74dc0f4538e647f7376461f
Author: Oneby Wang <[email protected]>
AuthorDate: Wed Jun 3 02:47:24 2026 +0800
[fix][meta] Fix PulsarZooKeeperClient async addWatch callback retry
behavior (#25913)
---
.../metadata/impl/PulsarZooKeeperClient.java | 10 ++--
.../apache/pulsar/metadata/MetadataStoreTest.java | 58 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 5 deletions(-)
diff --git
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
index aba28ec1fc3..2b39e1bf7ca 100644
---
a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
+++
b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/impl/PulsarZooKeeperClient.java
@@ -1174,7 +1174,7 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
}
@Override
- public void addWatch(String basePath, Watcher watcher, AddWatchMode mode,
VoidCallback cb, Object ctx) {
+ public void addWatch(String basePath, Watcher watcher, AddWatchMode mode,
VoidCallback cb, Object context) {
final Runnable proc = new ZkRetryRunnable(operationRetryPolicy,
rateLimiter, setStats) {
final VoidCallback vCb = new VoidCallback() {
@@ -1185,7 +1185,7 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
if (allowRetry(worker, rc)) {
backOffAndRetry(that, worker.nextRetryWaitTime());
} else {
- vCb.processResult(rc, basePath, ctx);
+ cb.processResult(rc, path, context);
}
}
@@ -1195,15 +1195,15 @@ public class PulsarZooKeeperClient extends ZooKeeper
implements Watcher, AutoClo
void zkRun() {
ZooKeeper zkHandle = zk.get();
if (null == zkHandle) {
- PulsarZooKeeperClient.super.addWatch(basePath, watcher,
mode, cb, ctx);
+ PulsarZooKeeperClient.super.addWatch(basePath, watcher,
mode, vCb, worker);
} else {
- zkHandle.addWatch(basePath, watcher, mode, cb, ctx);
+ zkHandle.addWatch(basePath, watcher, mode, vCb, worker);
}
}
@Override
public String toString() {
- return String.format("setData (%s, mode = %s)", basePath,
mode.name());
+ return String.format("addWatch (%s, mode = %s)", basePath,
mode.name());
}
};
// execute it immediately
diff --git
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
index 9d2d805c07a..d23a1b95572 100644
---
a/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
+++
b/pulsar-metadata/src/test/java/org/apache/pulsar/metadata/MetadataStoreTest.java
@@ -19,6 +19,12 @@
package org.apache.pulsar.metadata;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
@@ -40,6 +46,7 @@ import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
@@ -49,6 +56,7 @@ import java.util.function.Supplier;
import lombok.Cleanup;
import lombok.CustomLog;
import lombok.SneakyThrows;
+import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.metadata.api.GetResult;
import org.apache.pulsar.metadata.api.MetadataStore;
@@ -65,6 +73,9 @@ import org.apache.pulsar.metadata.impl.DualMetadataStore;
import org.apache.pulsar.metadata.impl.PulsarZooKeeperClient;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.metadata.impl.oxia.OxiaMetadataStore;
+import org.apache.zookeeper.AddWatchMode;
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -548,6 +559,53 @@ public class MetadataStoreTest extends
BaseMetadataStoreTest {
assertFalse(zooKeeper.getClientConfig().isSaslClientEnabled());
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAsyncAddWatchRetriesWithWrapperCallback() throws Exception
{
+ String path = newKey();
+ @Cleanup
+ PulsarZooKeeperClient zkClient = PulsarZooKeeperClient.newBuilder()
+ .connectString(zks.getConnectionString())
+ .sessionTimeoutMs(3000)
+ .operationRetryPolicy(new
BoundExponentialBackoffRetryPolicy(0, 0, 3))
+ .build();
+
+ ZooKeeper mockZk = mock(ZooKeeper.class);
+ AtomicInteger attempts = new AtomicInteger();
+ doAnswer(invocation -> {
+ // The wrapper callback should consume this recoverable failure
and retry the addWatch operation.
+ int rc = attempts.incrementAndGet() == 1
+ ? KeeperException.Code.CONNECTIONLOSS.intValue()
+ : KeeperException.Code.OK.intValue();
+ String callbackPath = invocation.getArgument(0);
+ VoidCallback callback = invocation.getArgument(3);
+ Object callbackContext = invocation.getArgument(4);
+ callback.processResult(rc, callbackPath, callbackContext);
+ return null;
+ }).when(mockZk).addWatch(eq(path), any(Watcher.class),
eq(AddWatchMode.PERSISTENT_RECURSIVE),
+ any(VoidCallback.class), any());
+
+ // Force the Pulsar wrapper to delegate the async addWatch call to our
controlled ZooKeeper instance.
+ var zooKeeperRef = (AtomicReference<ZooKeeper>)
WhiteboxImpl.getInternalState(zkClient, "zk");
+ zooKeeperRef.set(mockZk);
+
+ CountDownLatch callbackCalled = new CountDownLatch(1);
+ AtomicInteger callbackRc = new AtomicInteger(Integer.MIN_VALUE);
+ zkClient.addWatch(path, event -> {
+ }, AddWatchMode.PERSISTENT_RECURSIVE, (rc, callbackPath, ctx) -> {
+ callbackRc.set(rc);
+ callbackCalled.countDown();
+ }, null);
+
+ assertTrue(callbackCalled.await(5, TimeUnit.SECONDS));
+
+ // The caller should only see the final successful result after the
retry, not the first CONNECTIONLOSS.
+ assertEquals(callbackRc.get(), KeeperException.Code.OK.intValue());
+ assertEquals(attempts.get(), 2);
+ verify(mockZk, times(2)).addWatch(eq(path), any(Watcher.class),
eq(AddWatchMode.PERSISTENT_RECURSIVE),
+ any(VoidCallback.class), any());
+ }
+
@Test
public void testOxiaLoadConfigFromFile() throws Exception {
final String metadataStoreName =
UUID.randomUUID().toString().replaceAll("-", "");