This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 85bdf67617c [fix][client] Don't pin broker-assigned redirect URL
across reconnect retries (#26009)
85bdf67617c is described below
commit 85bdf67617cc922b51cf59391f9caca697ca472a
Author: Matteo Merli <[email protected]>
AuthorDate: Fri Jun 12 08:23:38 2026 -0700
[fix][client] Don't pin broker-assigned redirect URL across reconnect
retries (#26009)
---
.../client/impl/BrokerClientIntegrationTest.java | 42 ++++++++++++++++++++++
.../pulsar/client/impl/ConnectionHandler.java | 12 +++++--
2 files changed, 51 insertions(+), 3 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
index ddc955f84d8..959bbe6a80f 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java
@@ -41,6 +41,8 @@ import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import java.io.InputStream;
import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.URI;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.security.GeneralSecurityException;
@@ -48,6 +50,7 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
@@ -1128,4 +1131,43 @@ public class BrokerClientIntegrationTest extends
ProducerConsumerBase {
consumer1.unsubscribe(true);
assertFalse(consumer1.isConnected());
}
+
+ /**
+ * Regression test for https://github.com/apache/pulsar/issues/25997: a
broker-assigned redirect
+ * ({@code CloseProducer}/{@code CloseConsumer} carrying an {@code
assignedBrokerServiceUrl})
+ * pointing at a wrong or unreachable broker must be honored for the
immediate reconnect attempt
+ * only. If that attempt fails, subsequent retries must fall back to topic
lookup instead of
+ * staying pinned to the stale address.
+ */
+ @Test
+ public void testStaleBrokerRedirectFallsBackToLookup() throws Exception {
+ String topic = "persistent://my-property/my-ns/staleRedirectFallback";
+ @Cleanup
+ Producer<byte[]> producerInstance =
pulsarClient.newProducer().topic(topic).create();
+ @Cleanup
+ Consumer<byte[]> consumerInstance =
pulsarClient.newConsumer().topic(topic)
+ .subscriptionName("my-subscriber-name").subscribe();
+ ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>)
producerInstance;
+ ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>)
consumerInstance;
+
+ // An address with no listener behind it: dialing the redirect fails
immediately.
+ URI unreachable;
+ try (ServerSocket socket = new ServerSocket(0)) {
+ unreachable = URI.create("pulsar://127.0.0.1:" +
socket.getLocalPort());
+ }
+
+ // Deliver the disconnect+redirect exactly as
ClientCnx#handleCloseProducer and
+ // #handleCloseConsumer do for a broker unload notification.
+ producer.connectionClosed(producer.getClientCnx(), Optional.of(0L),
Optional.of(unreachable));
+ consumer.connectionClosed(consumer.getClientCnx(), Optional.of(0L),
Optional.of(unreachable));
+
+ // The redirect dial fails; the clients must recover by looking up the
topic again.
+ Awaitility.await().atMost(15, TimeUnit.SECONDS).untilAsserted(() -> {
+ assertEquals(producer.getState(), State.Ready);
+ assertEquals(consumer.getState(), State.Ready);
+ });
+
+ producer.send("msg".getBytes(UTF_8));
+ assertNotNull(consumer.receive(10, TimeUnit.SECONDS));
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
index e398b0583fa..5538feb8537 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java
@@ -227,9 +227,15 @@ public class ConnectionHandler {
public void connectionClosed(ClientCnx cnx, Optional<Long>
initialConnectionDelayMs, Optional<URI> hostUrl) {
lastConnectionClosedTimestamp = System.currentTimeMillis();
duringConnect.set(false);
- // Remember an explicit reconnect target so a later first-attempt
failure (reconnectLater)
- // re-dials the same broker rather than falling back to the service
URL.
- hostUrl.ifPresent(uri -> this.explicitHostURI = uri);
+ // Only handlers already pinned to an explicit host (v5 TC
metadata-store discovery) update
+ // the pin here, so a later first-attempt failure (reconnectLater)
re-dials the current
+ // leader rather than falling back to the service URL. For
lookup-based handlers
+ // (producers/consumers), the broker-assigned redirect in hostUrl may
be wrong or stale:
+ // it is honored for the immediate reconnect attempt below only, and
retries after a
+ // failure go through a fresh topic lookup instead of staying pinned
to it.
+ if (explicitHostURI != null) {
+ hostUrl.ifPresent(uri -> this.explicitHostURI = uri);
+ }
state.client.getCnxPool().releaseConnection(cnx);
if (CLIENT_CNX_UPDATER.compareAndSet(this, cnx, null)) {
if (!state.changeToConnecting()) {