This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new cd910c27f06 [feat] PIP-468: Auto-reconnect scalable consumer session 
on disconnect (#25623)
cd910c27f06 is described below

commit cd910c27f06175d4ea4be96bb0f857be491fb6d8
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 07:41:58 2026 -0700

    [feat] PIP-468: Auto-reconnect scalable consumer session on disconnect 
(#25623)
---
 .../api/v5/V5StreamConsumerAutoReconnectTest.java  | 259 +++++++++++++++++++++
 .../client/impl/v5/ScalableConsumerClient.java     | 129 ++++++++--
 2 files changed, 374 insertions(+), 14 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerAutoReconnectTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerAutoReconnectTest.java
new file mode 100644
index 00000000000..a5e335c56b6
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerAutoReconnectTest.java
@@ -0,0 +1,259 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for automatic reconnect on the scalable consumer's controller 
session.
+ *
+ * <p>A V5 StreamConsumer keeps a long-lived subscription against the topic
+ * controller — that's how it learns about assignment changes (split / merge /
+ * peer join / peer leave). When the underlying connection drops mid-life
+ * (broker bounce, network blip, container restart) the per-segment v4 
consumers
+ * have their own reconnect logic, but the controller session needs its own.
+ *
+ * <p>These tests force-close the controller channel and assert:
+ * <ul>
+ *   <li>the consumer continues to deliver messages without the application
+ *       having to do anything; and</li>
+ *   <li>within the controller's grace window the assignment is preserved, so a
+ *       second consumer that didn't drop sees no reshuffling.</li>
+ * </ul>
+ */
+public class V5StreamConsumerAutoReconnectTest extends V5ClientBaseTest {
+
+    /**
+     * Force-close the controller channel underneath a single stream consumer 
and
+     * verify it continues to receive messages produced after the disconnect. 
The
+     * v5 layer must reattach to the controller and resume receiving without 
any
+     * application-visible interruption.
+     */
+    @Test
+    public void testStreamConsumerSurvivesConnectionDrop() throws Exception {
+        String topic = newScalableTopic(2);
+        String subscription = "auto-reconnect-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("solo")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Pre-disconnect batch: prove the consumer is healthy before we sever.
+        int firstN = 20;
+        Set<String> firstSent = new HashSet<>();
+        for (int i = 0; i < firstN; i++) {
+            String v = "first-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            firstSent.add(v);
+        }
+        Set<String> firstReceived = drain(consumer, firstN);
+        assertEquals(firstReceived, firstSent, "first batch must arrive in 
full before disconnect");
+
+        // Sever the controller channel. The cnx layer fires 
connectionClosed() on
+        // the scalable session, which kicks off the reconnect path with 
backoff.
+        forceCloseControllerChannel(consumer);
+
+        // Post-disconnect batch: produced after the channel close. The 
consumer
+        // must auto-reconnect to the controller and resume serving its 
segments.
+        int secondN = 20;
+        Set<String> secondSent = new HashSet<>();
+        for (int i = 0; i < secondN; i++) {
+            String v = "second-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            secondSent.add(v);
+        }
+        Set<String> secondReceived = drain(consumer, secondN);
+        assertEquals(secondReceived, secondSent,
+                "consumer must resume receiving after auto-reconnect");
+    }
+
+    /**
+     * Two stream consumers share the topic, then alice's controller channel is
+     * forcibly closed. Within the grace window (2s in the test cluster) 
alice's
+     * auto-reconnect should re-attach to the existing session and the same
+     * assignment, so bob's per-key share is unchanged across batches.
+     */
+    @Test
+    public void testReconnectWithinGracePreservesAssignment() throws Exception 
{
+        String topic = newScalableTopic(4);
+        String subscription = "auto-reconnect-grace-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        @Cleanup
+        StreamConsumer<String> alice = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("alice")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        @Cleanup
+        StreamConsumer<String> bob = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("bob")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Baseline batch: capture each consumer's per-key share before the 
drop.
+        int batchN = 60;
+        Set<String> firstSent = new HashSet<>();
+        for (int i = 0; i < batchN; i++) {
+            String v = "first-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            firstSent.add(v);
+        }
+        Set<String> aliceFirst = new HashSet<>();
+        Set<String> bobFirst = new HashSet<>();
+        Set<String> received1 = new HashSet<>();
+        drainBoth(alice, bob, batchN, received1, aliceFirst, bobFirst);
+        assertEquals(received1, firstSent, "first batch must be delivered 
exactly once");
+
+        // Sever alice's controller channel. The reconnect must land well 
within
+        // the 2s grace window — the controller still has alice's session and
+        // re-attaches the new connection to the existing assignment.
+        forceCloseControllerChannel(alice);
+
+        int secondN = 60;
+        Set<String> secondSent = new HashSet<>();
+        for (int i = 0; i < secondN; i++) {
+            String v = "second-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            secondSent.add(v);
+        }
+        Set<String> aliceSecond = new HashSet<>();
+        Set<String> bobSecond = new HashSet<>();
+        Set<String> received2 = new HashSet<>();
+        drainBoth(alice, bob, secondN, received2, aliceSecond, bobSecond);
+        assertEquals(received2, secondSent, "second batch must be delivered 
exactly once");
+
+        // Same key set, same routing → each consumer's per-key share must be
+        // identical across batches (modulo the prefix). That holds only if 
alice
+        // re-attached to her existing session — i.e. the reconnect landed 
within
+        // grace and no rebalance happened.
+        assertEquals(stripPrefix(aliceSecond, "second-"), 
stripPrefix(aliceFirst, "first-"),
+                "alice must keep her segments after auto-reconnect within 
grace");
+        assertEquals(stripPrefix(bobSecond, "second-"), stripPrefix(bobFirst, 
"first-"),
+                "bob must be unaffected by alice's reconnect");
+    }
+
+    // --- Helpers ---
+
+    /**
+     * Drain at most {@code expected} messages from {@code consumer}, capping 
at a
+     * generous deadline so a regression doesn't hang the suite.
+     */
+    private Set<String> drain(StreamConsumer<String> consumer, int expected) 
throws Exception {
+        Set<String> received = new HashSet<>();
+        MessageId last = null;
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+                last = msg.id();
+            }
+        }
+        if (last != null) {
+            consumer.acknowledgeCumulative(last);
+        }
+        return received;
+    }
+
+    /**
+     * Drain a known number of messages across two consumers, populating each
+     * consumer's per-message set. Stops once {@code allReceived} reaches
+     * {@code expected} or the deadline fires.
+     */
+    private void drainBoth(StreamConsumer<String> a, StreamConsumer<String> b,
+                           int expected, Set<String> allReceived,
+                           Set<String> aGot, Set<String> bGot) throws 
Exception {
+        long deadline = System.currentTimeMillis() + 30_000L;
+        MessageId aLast = null;
+        MessageId bLast = null;
+        while (allReceived.size() < expected && System.currentTimeMillis() < 
deadline) {
+            Message<String> ma = a.receive(Duration.ofMillis(200));
+            if (ma != null) {
+                allReceived.add(ma.value());
+                aGot.add(ma.value());
+                aLast = ma.id();
+            }
+            Message<String> mb = b.receive(Duration.ofMillis(200));
+            if (mb != null) {
+                allReceived.add(mb.value());
+                bGot.add(mb.value());
+                bLast = mb.id();
+            }
+        }
+        if (aLast != null) {
+            a.acknowledgeCumulative(aLast);
+        }
+        if (bLast != null) {
+            b.acknowledgeCumulative(bLast);
+        }
+    }
+
+    private static Set<String> stripPrefix(Set<String> values, String prefix) {
+        Set<String> out = new HashSet<>(values.size());
+        for (String v : values) {
+            if (v.startsWith(prefix)) {
+                out.add(v.substring(prefix.length()));
+            }
+        }
+        return out;
+    }
+
+    /**
+     * Force-close the controller channel underneath a stream consumer. Reaches
+     * into the v5 internals via reflection because the {@code session} field 
on
+     * {@code ScalableStreamConsumer} and the test hook on
+     * {@code ScalableConsumerClient} are package-private.
+     */
+    private static void forceCloseControllerChannel(StreamConsumer<?> 
consumer) throws Exception {
+        Field sessionField = consumer.getClass().getDeclaredField("session");
+        sessionField.setAccessible(true);
+        Object session = sessionField.get(consumer);
+        assertNotNull(session, "expected session on stream consumer");
+        Method m = 
session.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
+        m.setAccessible(true);
+        m.invoke(session);
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
index 08c3643c601..30b6d32872b 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -21,10 +21,12 @@ package org.apache.pulsar.client.impl.v5;
 import io.github.merlimat.slog.Logger;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.ClientCnx;
@@ -38,6 +40,7 @@ import 
org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.scalable.HashRange;
+import org.apache.pulsar.common.util.Backoff;
 
 /**
  * Client-side session for a scalable-topic consumer (Stream or Checkpoint).
@@ -51,6 +54,13 @@ import org.apache.pulsar.common.scalable.HashRange;
  * <p>Mirrors {@link DagWatchClient} in shape; the wire path is different — 
subscribe
  * is request/response (matched by request id) and updates are tagged with the
  * client-chosen consumer id.
+ *
+ * <p><b>Reconnect.</b> On {@link #connectionClosed} after the initial 
assignment has
+ * arrived, the session re-runs lookup → connect → register → subscribe with 
backoff
+ * and feeds the response back through {@link #onAssignmentUpdate}. Within the
+ * controller's grace period the broker re-attaches the existing registration 
and
+ * returns the same assignment (no listener-visible change); past grace the 
controller
+ * rebalances and the listener applies the diff.
  */
 final class ScalableConsumerClient implements ScalableConsumerSession, 
AutoCloseable {
 
@@ -63,6 +73,7 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
     private final String consumerName;
     private final long consumerId;
     private final ScalableConsumerType consumerType;
+    private final Backoff reconnectBackoff;
 
     private final AtomicReference<List<ActiveSegment>> currentAssignment =
             new AtomicReference<>(List.of());
@@ -84,6 +95,10 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
         this.consumerName = consumerName;
         this.consumerId = v4Client.newConsumerId();
         this.consumerType = consumerType;
+        this.reconnectBackoff = Backoff.builder()
+                .initialDelay(Duration.ofMillis(100))
+                .maxBackoff(Duration.ofSeconds(30))
+                .build();
         this.log = LOG.with()
                 .attr("topic", topicName)
                 .attr("subscription", subscription)
@@ -99,6 +114,29 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
      * complete with the initial assignment.
      */
     CompletableFuture<List<ActiveSegment>> start() {
+        connectAndSubscribe()
+                .thenAccept(assignment -> {
+                    List<ActiveSegment> segments = applyAssignment(assignment);
+                    log.info().attr("epoch", currentEpoch)
+                            .attr("segments", segments.size())
+                            .log("Initial assignment received");
+                    initialAssignmentFuture.complete(segments);
+                })
+                .exceptionally(ex -> {
+                    initialAssignmentFuture.completeExceptionally(ex);
+                    return null;
+                });
+        return initialAssignmentFuture;
+    }
+
+    /**
+     * Open a connection to the controller, register this session, and send the
+     * subscribe command. Resolves with the assignment returned by the 
controller, or
+     * fails if any step (lookup, connect, write, or response) fails.
+     */
+    private CompletableFuture<ScalableConsumerAssignment> 
connectAndSubscribe() {
+        CompletableFuture<ScalableConsumerAssignment> result = new 
CompletableFuture<>();
+
         DagWatchClient watch = new DagWatchClient(v4Client, topicName);
         watch.start()
                 .thenCompose(layout -> {
@@ -125,9 +163,14 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
                     watch.close();
                 })
                 .thenAccept(cnx -> {
+                    if (closed) {
+                        result.completeExceptionally(
+                                new 
PulsarClientException.AlreadyClosedException("Session closed"));
+                        return;
+                    }
                     this.cnx = cnx;
                     if (!cnx.isSupportsScalableTopics()) {
-                        initialAssignmentFuture.completeExceptionally(
+                        result.completeExceptionally(
                                 new 
PulsarClientException.FeatureNotSupportedException(
                                         "Broker does not support scalable 
topics",
                                         
PulsarClientException.FailedFeatureCheck.SupportsScalableTopics));
@@ -150,30 +193,24 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
                                 if (!writeFuture.isSuccess()) {
                                     cnx.getPendingRequests().remove(requestId);
                                     
cnx.removeScalableConsumerSession(consumerId);
-                                    
initialAssignmentFuture.completeExceptionally(
+                                    result.completeExceptionally(
                                             new 
PulsarClientException(writeFuture.cause()));
                                 }
                             });
 
                     responseFuture.whenComplete((assignment, ex) -> {
                         if (ex != null) {
-                            initialAssignmentFuture.completeExceptionally(ex);
-                            return;
+                            result.completeExceptionally(ex);
+                        } else {
+                            result.complete(assignment);
                         }
-                        List<ActiveSegment> segments = 
toSegmentList(assignment);
-                        currentAssignment.set(segments);
-                        currentEpoch = assignment.getLayoutEpoch();
-                        log.info().attr("epoch", currentEpoch)
-                                .attr("segments", segments.size())
-                                .log("Initial assignment received");
-                        initialAssignmentFuture.complete(segments);
                     });
                 })
                 .exceptionally(ex -> {
-                    initialAssignmentFuture.completeExceptionally(ex);
+                    result.completeExceptionally(ex);
                     return null;
                 });
-        return initialAssignmentFuture;
+        return result;
     }
 
     @Override
@@ -187,6 +224,16 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
                     .log("Ignoring stale assignment update");
             return;
         }
+        applyAssignment(assignment);
+    }
+
+    /**
+     * Update the current assignment + epoch and notify the listener. Returns 
the new
+     * segment list. Caller is responsible for the staleness check; this 
method always
+     * applies what it's given.
+     */
+    private List<ActiveSegment> applyAssignment(ScalableConsumerAssignment 
assignment) {
+        long epoch = assignment.getLayoutEpoch();
         List<ActiveSegment> newSegments = toSegmentList(assignment);
         List<ActiveSegment> oldSegments = 
currentAssignment.getAndSet(newSegments);
         currentEpoch = epoch;
@@ -201,17 +248,59 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
                 log.error().exception(e).log("Error in assignment change 
listener");
             }
         }
+        return newSegments;
     }
 
     @Override
     public void connectionClosed() {
         log.warn("Scalable consumer session connection closed");
         cnx = null;
+        if (closed) {
+            return;
+        }
         if (!initialAssignmentFuture.isDone()) {
+            // Initial subscribe never completed: surface the failure to the 
caller
+            // rather than retrying silently. The application chose to fail the
+            // subscribe, so failing here matches that expectation.
             initialAssignmentFuture.completeExceptionally(
                     new PulsarClientException("Connection closed before 
initial assignment arrived"));
+            return;
+        }
+        scheduleReconnect();
+    }
+
+    private void scheduleReconnect() {
+        if (closed) {
+            return;
         }
-        // TODO: implement automatic re-subscribe on reconnect
+        long delayMs = reconnectBackoff.next().toMillis();
+        log.info().attr("delayMs", delayMs).log("Scheduling reconnect");
+        v4Client.timer().newTimeout(timeout -> reconnect(),
+                delayMs, TimeUnit.MILLISECONDS);
+    }
+
+    private void reconnect() {
+        if (closed) {
+            return;
+        }
+        connectAndSubscribe()
+                .thenAccept(assignment -> {
+                    if (closed) {
+                        return;
+                    }
+                    // Feed the response through the standard update path so 
the
+                    // listener gets the diff. Within grace this is a no-op 
(same
+                    // segments); past grace the controller has rebalanced and 
the
+                    // listener attaches/detaches accordingly.
+                    onAssignmentUpdate(assignment);
+                    reconnectBackoff.reset();
+                    log.info().log("Reconnect succeeded");
+                })
+                .exceptionally(ex -> {
+                    log.warn().exceptionMessage(ex).log("Reconnect failed; 
will retry");
+                    scheduleReconnect();
+                    return null;
+                });
     }
 
     private static List<ActiveSegment> 
toSegmentList(ScalableConsumerAssignment assignment) {
@@ -238,6 +327,18 @@ final class ScalableConsumerClient implements 
ScalableConsumerSession, AutoClose
         return consumerId;
     }
 
+    /**
+     * Test hook: forcibly close the underlying broker channel to simulate a 
network
+     * drop. The cnx layer will fire {@link #connectionClosed()} which 
triggers the
+     * automatic reconnect path. Reached via reflection from cross-module 
tests.
+     */
+    void forceCloseConnectionForTesting() {
+        ClientCnx c = cnx;
+        if (c != null) {
+            c.ctx().channel().close();
+        }
+    }
+
     @Override
     public void close() {
         if (closed) {

Reply via email to