This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new cd910c27f06 [feat] PIP-468: Auto-reconnect scalable consumer session
on disconnect (#25623)
cd910c27f06 is described below
commit cd910c27f06175d4ea4be96bb0f857be491fb6d8
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 07:41:58 2026 -0700
[feat] PIP-468: Auto-reconnect scalable consumer session on disconnect
(#25623)
---
.../api/v5/V5StreamConsumerAutoReconnectTest.java | 259 +++++++++++++++++++++
.../client/impl/v5/ScalableConsumerClient.java | 129 ++++++++--
2 files changed, 374 insertions(+), 14 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerAutoReconnectTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerAutoReconnectTest.java
new file mode 100644
index 00000000000..a5e335c56b6
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerAutoReconnectTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for automatic reconnect on the scalable consumer's controller
session.
+ *
+ * <p>A V5 StreamConsumer keeps a long-lived subscription against the topic
+ * controller — that's how it learns about assignment changes (split / merge /
+ * peer join / peer leave). When the underlying connection drops mid-life
+ * (broker bounce, network blip, container restart) the per-segment v4
consumers
+ * have their own reconnect logic, but the controller session needs its own.
+ *
+ * <p>These tests force-close the controller channel and assert:
+ * <ul>
+ * <li>the consumer continues to deliver messages without the application
+ * having to do anything; and</li>
+ * <li>within the controller's grace window the assignment is preserved, so a
+ * second consumer that didn't drop sees no reshuffling.</li>
+ * </ul>
+ */
+public class V5StreamConsumerAutoReconnectTest extends V5ClientBaseTest {
+
+ /**
+ * Force-close the controller channel underneath a single stream consumer
and
+ * verify it continues to receive messages produced after the disconnect.
The
+ * v5 layer must reattach to the controller and resume receiving without
any
+ * application-visible interruption.
+ */
+ @Test
+ public void testStreamConsumerSurvivesConnectionDrop() throws Exception {
+ String topic = newScalableTopic(2);
+ String subscription = "auto-reconnect-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("solo")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Pre-disconnect batch: prove the consumer is healthy before we sever.
+ int firstN = 20;
+ Set<String> firstSent = new HashSet<>();
+ for (int i = 0; i < firstN; i++) {
+ String v = "first-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ firstSent.add(v);
+ }
+ Set<String> firstReceived = drain(consumer, firstN);
+ assertEquals(firstReceived, firstSent, "first batch must arrive in
full before disconnect");
+
+ // Sever the controller channel. The cnx layer fires
connectionClosed() on
+ // the scalable session, which kicks off the reconnect path with
backoff.
+ forceCloseControllerChannel(consumer);
+
+ // Post-disconnect batch: produced after the channel close. The
consumer
+ // must auto-reconnect to the controller and resume serving its
segments.
+ int secondN = 20;
+ Set<String> secondSent = new HashSet<>();
+ for (int i = 0; i < secondN; i++) {
+ String v = "second-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ secondSent.add(v);
+ }
+ Set<String> secondReceived = drain(consumer, secondN);
+ assertEquals(secondReceived, secondSent,
+ "consumer must resume receiving after auto-reconnect");
+ }
+
+ /**
+ * Two stream consumers share the topic, then alice's controller channel is
+ * forcibly closed. Within the grace window (2s in the test cluster)
alice's
+ * auto-reconnect should re-attach to the existing session and the same
+ * assignment, so bob's per-key share is unchanged across batches.
+ */
+ @Test
+ public void testReconnectWithinGracePreservesAssignment() throws Exception
{
+ String topic = newScalableTopic(4);
+ String subscription = "auto-reconnect-grace-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ @Cleanup
+ StreamConsumer<String> alice =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("alice")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ @Cleanup
+ StreamConsumer<String> bob =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("bob")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Baseline batch: capture each consumer's per-key share before the
drop.
+ int batchN = 60;
+ Set<String> firstSent = new HashSet<>();
+ for (int i = 0; i < batchN; i++) {
+ String v = "first-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ firstSent.add(v);
+ }
+ Set<String> aliceFirst = new HashSet<>();
+ Set<String> bobFirst = new HashSet<>();
+ Set<String> received1 = new HashSet<>();
+ drainBoth(alice, bob, batchN, received1, aliceFirst, bobFirst);
+ assertEquals(received1, firstSent, "first batch must be delivered
exactly once");
+
+ // Sever alice's controller channel. The reconnect must land well
within
+ // the 2s grace window — the controller still has alice's session and
+ // re-attaches the new connection to the existing assignment.
+ forceCloseControllerChannel(alice);
+
+ int secondN = 60;
+ Set<String> secondSent = new HashSet<>();
+ for (int i = 0; i < secondN; i++) {
+ String v = "second-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ secondSent.add(v);
+ }
+ Set<String> aliceSecond = new HashSet<>();
+ Set<String> bobSecond = new HashSet<>();
+ Set<String> received2 = new HashSet<>();
+ drainBoth(alice, bob, secondN, received2, aliceSecond, bobSecond);
+ assertEquals(received2, secondSent, "second batch must be delivered
exactly once");
+
+ // Same key set, same routing → each consumer's per-key share must be
+ // identical across batches (modulo the prefix). That holds only if
alice
+ // re-attached to her existing session — i.e. the reconnect landed
within
+ // grace and no rebalance happened.
+ assertEquals(stripPrefix(aliceSecond, "second-"),
stripPrefix(aliceFirst, "first-"),
+ "alice must keep her segments after auto-reconnect within
grace");
+ assertEquals(stripPrefix(bobSecond, "second-"), stripPrefix(bobFirst,
"first-"),
+ "bob must be unaffected by alice's reconnect");
+ }
+
+ // --- Helpers ---
+
+ /**
+ * Drain at most {@code expected} messages from {@code consumer}, capping
at a
+ * generous deadline so a regression doesn't hang the suite.
+ */
+ private Set<String> drain(StreamConsumer<String> consumer, int expected)
throws Exception {
+ Set<String> received = new HashSet<>();
+ MessageId last = null;
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ last = msg.id();
+ }
+ }
+ if (last != null) {
+ consumer.acknowledgeCumulative(last);
+ }
+ return received;
+ }
+
+ /**
+ * Drain a known number of messages across two consumers, populating each
+ * consumer's per-message set. Stops once {@code allReceived} reaches
+ * {@code expected} or the deadline fires.
+ */
+ private void drainBoth(StreamConsumer<String> a, StreamConsumer<String> b,
+ int expected, Set<String> allReceived,
+ Set<String> aGot, Set<String> bGot) throws
Exception {
+ long deadline = System.currentTimeMillis() + 30_000L;
+ MessageId aLast = null;
+ MessageId bLast = null;
+ while (allReceived.size() < expected && System.currentTimeMillis() <
deadline) {
+ Message<String> ma = a.receive(Duration.ofMillis(200));
+ if (ma != null) {
+ allReceived.add(ma.value());
+ aGot.add(ma.value());
+ aLast = ma.id();
+ }
+ Message<String> mb = b.receive(Duration.ofMillis(200));
+ if (mb != null) {
+ allReceived.add(mb.value());
+ bGot.add(mb.value());
+ bLast = mb.id();
+ }
+ }
+ if (aLast != null) {
+ a.acknowledgeCumulative(aLast);
+ }
+ if (bLast != null) {
+ b.acknowledgeCumulative(bLast);
+ }
+ }
+
+ private static Set<String> stripPrefix(Set<String> values, String prefix) {
+ Set<String> out = new HashSet<>(values.size());
+ for (String v : values) {
+ if (v.startsWith(prefix)) {
+ out.add(v.substring(prefix.length()));
+ }
+ }
+ return out;
+ }
+
+ /**
+ * Force-close the controller channel underneath a stream consumer. Reaches
+ * into the v5 internals via reflection because the {@code session} field
on
+ * {@code ScalableStreamConsumer} and the test hook on
+ * {@code ScalableConsumerClient} are package-private.
+ */
+ private static void forceCloseControllerChannel(StreamConsumer<?>
consumer) throws Exception {
+ Field sessionField = consumer.getClass().getDeclaredField("session");
+ sessionField.setAccessible(true);
+ Object session = sessionField.get(consumer);
+ assertNotNull(session, "expected session on stream consumer");
+ Method m =
session.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+ m.setAccessible(true);
+ m.invoke(session);
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index 08c3643c601..30b6d32872b 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -21,10 +21,12 @@ package org.apache.pulsar.client.impl.v5;
import io.github.merlimat.slog.Logger;
import java.net.InetSocketAddress;
import java.net.URI;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.ClientCnx;
@@ -38,6 +40,7 @@ import
org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.util.Backoff;
/**
* Client-side session for a scalable-topic consumer (Stream or Checkpoint).
@@ -51,6 +54,13 @@ import org.apache.pulsar.common.scalable.HashRange;
* <p>Mirrors {@link DagWatchClient} in shape; the wire path is different —
subscribe
* is request/response (matched by request id) and updates are tagged with the
* client-chosen consumer id.
+ *
+ * <p><b>Reconnect.</b> On {@link #connectionClosed} after the initial
assignment has
+ * arrived, the session re-runs lookup → connect → register → subscribe with
backoff
+ * and feeds the response back through {@link #onAssignmentUpdate}. Within the
+ * controller's grace period the broker re-attaches the existing registration
and
+ * returns the same assignment (no listener-visible change); past grace the
controller
+ * rebalances and the listener applies the diff.
*/
final class ScalableConsumerClient implements ScalableConsumerSession,
AutoCloseable {
@@ -63,6 +73,7 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
private final String consumerName;
private final long consumerId;
private final ScalableConsumerType consumerType;
+ private final Backoff reconnectBackoff;
private final AtomicReference<List<ActiveSegment>> currentAssignment =
new AtomicReference<>(List.of());
@@ -84,6 +95,10 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
this.consumerName = consumerName;
this.consumerId = v4Client.newConsumerId();
this.consumerType = consumerType;
+ this.reconnectBackoff = Backoff.builder()
+ .initialDelay(Duration.ofMillis(100))
+ .maxBackoff(Duration.ofSeconds(30))
+ .build();
this.log = LOG.with()
.attr("topic", topicName)
.attr("subscription", subscription)
@@ -99,6 +114,29 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
* complete with the initial assignment.
*/
CompletableFuture<List<ActiveSegment>> start() {
+ connectAndSubscribe()
+ .thenAccept(assignment -> {
+ List<ActiveSegment> segments = applyAssignment(assignment);
+ log.info().attr("epoch", currentEpoch)
+ .attr("segments", segments.size())
+ .log("Initial assignment received");
+ initialAssignmentFuture.complete(segments);
+ })
+ .exceptionally(ex -> {
+ initialAssignmentFuture.completeExceptionally(ex);
+ return null;
+ });
+ return initialAssignmentFuture;
+ }
+
+ /**
+ * Open a connection to the controller, register this session, and send the
+ * subscribe command. Resolves with the assignment returned by the
controller, or
+ * fails if any step (lookup, connect, write, or response) fails.
+ */
+ private CompletableFuture<ScalableConsumerAssignment>
connectAndSubscribe() {
+ CompletableFuture<ScalableConsumerAssignment> result = new
CompletableFuture<>();
+
DagWatchClient watch = new DagWatchClient(v4Client, topicName);
watch.start()
.thenCompose(layout -> {
@@ -125,9 +163,14 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
watch.close();
})
.thenAccept(cnx -> {
+ if (closed) {
+ result.completeExceptionally(
+ new
PulsarClientException.AlreadyClosedException("Session closed"));
+ return;
+ }
this.cnx = cnx;
if (!cnx.isSupportsScalableTopics()) {
- initialAssignmentFuture.completeExceptionally(
+ result.completeExceptionally(
new
PulsarClientException.FeatureNotSupportedException(
"Broker does not support scalable
topics",
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
@@ -150,30 +193,24 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
if (!writeFuture.isSuccess()) {
cnx.getPendingRequests().remove(requestId);
cnx.removeScalableConsumerSession(consumerId);
-
initialAssignmentFuture.completeExceptionally(
+ result.completeExceptionally(
new
PulsarClientException(writeFuture.cause()));
}
});
responseFuture.whenComplete((assignment, ex) -> {
if (ex != null) {
- initialAssignmentFuture.completeExceptionally(ex);
- return;
+ result.completeExceptionally(ex);
+ } else {
+ result.complete(assignment);
}
- List<ActiveSegment> segments =
toSegmentList(assignment);
- currentAssignment.set(segments);
- currentEpoch = assignment.getLayoutEpoch();
- log.info().attr("epoch", currentEpoch)
- .attr("segments", segments.size())
- .log("Initial assignment received");
- initialAssignmentFuture.complete(segments);
});
})
.exceptionally(ex -> {
- initialAssignmentFuture.completeExceptionally(ex);
+ result.completeExceptionally(ex);
return null;
});
- return initialAssignmentFuture;
+ return result;
}
@Override
@@ -187,6 +224,16 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
.log("Ignoring stale assignment update");
return;
}
+ applyAssignment(assignment);
+ }
+
+ /**
+ * Update the current assignment + epoch and notify the listener. Returns
the new
+ * segment list. Caller is responsible for the staleness check; this
method always
+ * applies what it's given.
+ */
+ private List<ActiveSegment> applyAssignment(ScalableConsumerAssignment
assignment) {
+ long epoch = assignment.getLayoutEpoch();
List<ActiveSegment> newSegments = toSegmentList(assignment);
List<ActiveSegment> oldSegments =
currentAssignment.getAndSet(newSegments);
currentEpoch = epoch;
@@ -201,17 +248,59 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
log.error().exception(e).log("Error in assignment change
listener");
}
}
+ return newSegments;
}
@Override
public void connectionClosed() {
log.warn("Scalable consumer session connection closed");
cnx = null;
+ if (closed) {
+ return;
+ }
if (!initialAssignmentFuture.isDone()) {
+ // Initial subscribe never completed: surface the failure to the
caller
+ // rather than retrying silently. The application chose to fail the
+ // subscribe, so failing here matches that expectation.
initialAssignmentFuture.completeExceptionally(
new PulsarClientException("Connection closed before
initial assignment arrived"));
+ return;
+ }
+ scheduleReconnect();
+ }
+
+ private void scheduleReconnect() {
+ if (closed) {
+ return;
}
- // TODO: implement automatic re-subscribe on reconnect
+ long delayMs = reconnectBackoff.next().toMillis();
+ log.info().attr("delayMs", delayMs).log("Scheduling reconnect");
+ v4Client.timer().newTimeout(timeout -> reconnect(),
+ delayMs, TimeUnit.MILLISECONDS);
+ }
+
+ private void reconnect() {
+ if (closed) {
+ return;
+ }
+ connectAndSubscribe()
+ .thenAccept(assignment -> {
+ if (closed) {
+ return;
+ }
+ // Feed the response through the standard update path so
the
+ // listener gets the diff. Within grace this is a no-op
(same
+ // segments); past grace the controller has rebalanced and
the
+ // listener attaches/detaches accordingly.
+ onAssignmentUpdate(assignment);
+ reconnectBackoff.reset();
+ log.info().log("Reconnect succeeded");
+ })
+ .exceptionally(ex -> {
+ log.warn().exceptionMessage(ex).log("Reconnect failed;
will retry");
+ scheduleReconnect();
+ return null;
+ });
}
private static List<ActiveSegment>
toSegmentList(ScalableConsumerAssignment assignment) {
@@ -238,6 +327,18 @@ final class ScalableConsumerClient implements
ScalableConsumerSession, AutoClose
return consumerId;
}
+ /**
+ * Test hook: forcibly close the underlying broker channel to simulate a
network
+ * drop. The cnx layer will fire {@link #connectionClosed()} which
triggers the
+ * automatic reconnect path. Reached via reflection from cross-module
tests.
+ */
+ void forceCloseConnectionForTesting() {
+ ClientCnx c = cnx;
+ if (c != null) {
+ c.ctx().channel().close();
+ }
+ }
+
@Override
public void close() {
if (closed) {