This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a6b69d2199 [feat] PIP-468: V5 StreamConsumer disconnect / reconnect / 
past-grace reassignment (#25619)
1a6b69d2199 is described below

commit 1a6b69d2199e5c668e0babffc48300fafa81f980
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 06:18:38 2026 -0700

    [feat] PIP-468: V5 StreamConsumer disconnect / reconnect / past-grace 
reassignment (#25619)
---
 .../apache/pulsar/broker/ServiceConfiguration.java |   9 ++
 .../service/scalable/ScalableTopicController.java  |  17 +-
 .../pulsar/broker/service/SharedPulsarCluster.java |   4 +
 .../client/api/v5/V5MultipleConsumersTest.java     | 175 +++++++++++++++++++++
 4 files changed, 204 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d62cb1085ce..b22f9a4bef8 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1298,6 +1298,15 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     )
     private boolean scalableTopicsEnabled = true;
 
+    @FieldContext(
+            dynamic = false,
+            category = CATEGORY_POLICIES,
+            doc = "Grace period (seconds) the controller leader waits for a 
disconnected scalable-topic "
+                    + "consumer to reconnect with the same consumer name 
before evicting its session and "
+                    + "reassigning its segments to remaining consumers."
+    )
+    private int scalableTopicConsumerSessionGracePeriodSeconds = 60;
+
     @FieldContext(
             dynamic = false,
             category = CATEGORY_POLICIES,
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 78a36c8d9d3..b63ee847e34 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.service.scalable;
 
 import io.github.merlimat.slog.Logger;
+import java.time.Duration;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
@@ -157,12 +158,26 @@ public class ScalableTopicController {
     }
 
     private SubscriptionCoordinator createCoordinator(String subscription) {
+        // Defensive: PulsarService.getConfig() is null in some unit-test 
mocks. Fall
+        // back to the SubscriptionCoordinator's default grace period in that 
case.
+        var config = brokerService.getPulsar().getConfig();
+        if (config == null) {
+            return new SubscriptionCoordinator(
+                    subscription,
+                    topicName,
+                    currentLayout,
+                    resources,
+                    brokerService.getPulsar().getExecutor());
+        }
+        Duration gracePeriod = Duration.ofSeconds(
+                config.getScalableTopicConsumerSessionGracePeriodSeconds());
         return new SubscriptionCoordinator(
                 subscription,
                 topicName,
                 currentLayout,
                 resources,
-                brokerService.getPulsar().getExecutor());
+                brokerService.getPulsar().getExecutor(),
+                gracePeriod);
     }
 
     private CompletableFuture<Void> electLeader() {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index 83a725fc969..1866a5a9649 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -148,6 +148,10 @@ public class SharedPulsarCluster {
         config.setForceDeleteTenantAllowed(true);
         config.setBrokerDeleteInactiveTopicsEnabled(false);
         config.setBrokerDeduplicationEnabled(true);
+        // Tests rely on aggressive eviction of disconnected scalable-topic 
consumer
+        // sessions so reassignment-after-disconnect can be exercised in a few 
seconds
+        // instead of the production default of 60s.
+        config.setScalableTopicConsumerSessionGracePeriodSeconds(2);
 
         // Reduce thread pool sizes for faster startup (fewer threads to 
create)
         config.setNumIOThreads(2);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
index 11e71dfd069..5bcd5f127dd 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
@@ -257,4 +257,179 @@ public class V5MultipleConsumersTest extends 
V5ClientBaseTest {
         t.start();
         return t;
     }
+
+    /**
+     * A consumer that closes and re-subscribes with the same consumer name 
(within the
+     * controller grace period, before its session is evicted) must keep its 
segment
+     * assignment — the controller re-attaches the new subscribe to the 
existing
+     * session without rebalancing.
+     */
+    @Test
+    public void testStreamConsumerReconnectWithinGraceKeepsAssignment() throws 
Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "stream-reconnect-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        StreamConsumer<String> alice = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("alice")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        @Cleanup
+        StreamConsumer<String> bob = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("bob")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // First batch: drain across both, capture each consumer's per-key 
share.
+        int firstN = 80;
+        Set<String> firstSent = new HashSet<>();
+        for (int i = 0; i < firstN; i++) {
+            String v = "first-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            firstSent.add(v);
+        }
+
+        Set<String> aliceFirst = ConcurrentHashMap.newKeySet();
+        Set<String> bobFirst = ConcurrentHashMap.newKeySet();
+        Set<String> received1 = ConcurrentHashMap.newKeySet();
+        Thread t1 = drainStreamTo(alice, received1, aliceFirst);
+        Thread t2 = drainStreamTo(bob, received1, bobFirst);
+        t1.join();
+        t2.join();
+        assertEquals(received1, firstSent, "first batch must be delivered 
exactly once");
+        assertTrue(!aliceFirst.isEmpty() && !bobFirst.isEmpty(),
+                "controller must split the first batch across alice + bob");
+
+        // Close alice's consumer and re-subscribe under the same name. The 
controller
+        // sees the existing session for "alice", attaches the new subscribe 
to it, and
+        // pushes back the same assignment without touching bob.
+        alice.close();
+        @Cleanup
+        StreamConsumer<String> aliceRejoined = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("alice")
+                .subscribe();
+
+        // Second batch: pin the same key set so each generation routes to the 
same
+        // segments — alice's per-key share must match across batches (modulo 
prefix).
+        int secondN = 80;
+        Set<String> secondSent = new HashSet<>();
+        for (int i = 0; i < secondN; i++) {
+            String v = "second-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            secondSent.add(v);
+        }
+
+        Set<String> aliceSecond = ConcurrentHashMap.newKeySet();
+        Set<String> bobSecond = ConcurrentHashMap.newKeySet();
+        Set<String> received2 = ConcurrentHashMap.newKeySet();
+        Thread r1 = drainStreamTo(aliceRejoined, received2, aliceSecond);
+        Thread r2 = drainStreamTo(bob, received2, bobSecond);
+        r1.join();
+        r2.join();
+        assertEquals(received2, secondSent, "second batch must be delivered 
exactly once");
+
+        Set<String> aliceFirstKeys = stripPrefix(aliceFirst, "first-");
+        Set<String> aliceSecondKeys = stripPrefix(aliceSecond, "second-");
+        assertEquals(aliceSecondKeys, aliceFirstKeys,
+                "alice must keep her segments after re-subscribe within 
grace");
+        Set<String> bobFirstKeys = stripPrefix(bobFirst, "first-");
+        Set<String> bobSecondKeys = stripPrefix(bobSecond, "second-");
+        assertEquals(bobSecondKeys, bobFirstKeys,
+                "bob must keep his segments unchanged when alice 
re-subscribes");
+    }
+
+    /**
+     * If a consumer disconnects and stays away past the controller grace 
period, its
+     * session is evicted and segments rebalanced to remaining peers. We force 
a
+     * disconnect by closing alice's dedicated client (consumer.close() alone 
doesn't
+     * sever the underlying connection), then trust the rebalance to land 
while bob
+     * drains the second batch.
+     */
+    @Test
+    public void testStreamConsumerDisconnectPastGraceTriggersReassignment() 
throws Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "stream-evict-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // alice on her own client so we can sever the connection 
independently.
+        PulsarClient aliceClient = newV5Client();
+        StreamConsumer<String> alice = 
aliceClient.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("alice")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        @Cleanup
+        StreamConsumer<String> bob = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .consumerName("bob")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int firstN = 60;
+        for (int i = 0; i < firstN; i++) {
+            producer.newMessage().key("k-" + i).value("first-" + i).send();
+        }
+
+        Set<String> received1 = ConcurrentHashMap.newKeySet();
+        Set<String> aliceFirst = ConcurrentHashMap.newKeySet();
+        Set<String> bobFirst = ConcurrentHashMap.newKeySet();
+        Thread t1 = drainStreamTo(alice, received1, aliceFirst);
+        Thread t2 = drainStreamTo(bob, received1, bobFirst);
+        t1.join();
+        t2.join();
+        assertTrue(!aliceFirst.isEmpty() && !bobFirst.isEmpty(),
+                "controller must split first batch across alice + bob");
+
+        // Sever alice's connection. The broker arms the grace timer; once it 
fires the
+        // rebalance pushes a new assignment to bob covering every active 
segment.
+        aliceClient.close();
+
+        // Produce immediately and drain bob until we've received all 
second-batch
+        // messages — the rebalance will land mid-drain and bob picks up 
alice's
+        // segments. Cap with a generous total deadline so a regression 
doesn't hang.
+        int secondN = 60;
+        Set<String> secondSent = new HashSet<>();
+        for (int i = 0; i < secondN; i++) {
+            String v = "second-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            secondSent.add(v);
+        }
+
+        Set<String> bobSecond = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (bobSecond.size() < secondN && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = bob.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                bobSecond.add(msg.value());
+            }
+        }
+        assertEquals(bobSecond, secondSent,
+                "after past-grace eviction bob must end up serving every 
segment");
+    }
+
+    private static Set<String> stripPrefix(Set<String> values, String prefix) {
+        Set<String> out = new HashSet<>(values.size());
+        for (String v : values) {
+            if (v.startsWith(prefix)) {
+                out.add(v.substring(prefix.length()));
+            }
+        }
+        return out;
+    }
 }

Reply via email to