This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1a6b69d2199 [feat] PIP-468: V5 StreamConsumer disconnect / reconnect /
past-grace reassignment (#25619)
1a6b69d2199 is described below
commit 1a6b69d2199e5c668e0babffc48300fafa81f980
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 06:18:38 2026 -0700
[feat] PIP-468: V5 StreamConsumer disconnect / reconnect / past-grace
reassignment (#25619)
---
.../apache/pulsar/broker/ServiceConfiguration.java | 9 ++
.../service/scalable/ScalableTopicController.java | 17 +-
.../pulsar/broker/service/SharedPulsarCluster.java | 4 +
.../client/api/v5/V5MultipleConsumersTest.java | 175 +++++++++++++++++++++
4 files changed, 204 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index d62cb1085ce..b22f9a4bef8 100644
---
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1298,6 +1298,15 @@ public class ServiceConfiguration implements
PulsarConfiguration {
)
private boolean scalableTopicsEnabled = true;
+ @FieldContext(
+ dynamic = false,
+ category = CATEGORY_POLICIES,
+ doc = "Grace period (seconds) the controller leader waits for a
disconnected scalable-topic "
+ + "consumer to reconnect with the same consumer name
before evicting its session and "
+ + "reassigning its segments to remaining consumers."
+ )
+ private int scalableTopicConsumerSessionGracePeriodSeconds = 60;
+
@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
index 78a36c8d9d3..b63ee847e34 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.service.scalable;
import io.github.merlimat.slog.Logger;
+import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.Map;
@@ -157,12 +158,26 @@ public class ScalableTopicController {
}
private SubscriptionCoordinator createCoordinator(String subscription) {
+ // Defensive: PulsarService.getConfig() is null in some unit-test
mocks. Fall
+ // back to the SubscriptionCoordinator's default grace period in that
case.
+ var config = brokerService.getPulsar().getConfig();
+ if (config == null) {
+ return new SubscriptionCoordinator(
+ subscription,
+ topicName,
+ currentLayout,
+ resources,
+ brokerService.getPulsar().getExecutor());
+ }
+ Duration gracePeriod = Duration.ofSeconds(
+ config.getScalableTopicConsumerSessionGracePeriodSeconds());
return new SubscriptionCoordinator(
subscription,
topicName,
currentLayout,
resources,
- brokerService.getPulsar().getExecutor());
+ brokerService.getPulsar().getExecutor(),
+ gracePeriod);
}
private CompletableFuture<Void> electLeader() {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
index 83a725fc969..1866a5a9649 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedPulsarCluster.java
@@ -148,6 +148,10 @@ public class SharedPulsarCluster {
config.setForceDeleteTenantAllowed(true);
config.setBrokerDeleteInactiveTopicsEnabled(false);
config.setBrokerDeduplicationEnabled(true);
+ // Tests rely on aggressive eviction of disconnected scalable-topic
consumer
+ // sessions so reassignment-after-disconnect can be exercised in a few
seconds
+ // instead of the production default of 60s.
+ config.setScalableTopicConsumerSessionGracePeriodSeconds(2);
// Reduce thread pool sizes for faster startup (fewer threads to
create)
config.setNumIOThreads(2);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
index 11e71dfd069..5bcd5f127dd 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
@@ -257,4 +257,179 @@ public class V5MultipleConsumersTest extends
V5ClientBaseTest {
t.start();
return t;
}
+
+ /**
+ * A consumer that closes and re-subscribes with the same consumer name
(within the
+ * controller grace period, before its session is evicted) must keep its
segment
+ * assignment — the controller re-attaches the new subscribe to the
existing
+ * session without rebalancing.
+ */
+ @Test
+ public void testStreamConsumerReconnectWithinGraceKeepsAssignment() throws
Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "stream-reconnect-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ StreamConsumer<String> alice =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("alice")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ @Cleanup
+ StreamConsumer<String> bob =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("bob")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // First batch: drain across both, capture each consumer's per-key
share.
+ int firstN = 80;
+ Set<String> firstSent = new HashSet<>();
+ for (int i = 0; i < firstN; i++) {
+ String v = "first-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ firstSent.add(v);
+ }
+
+ Set<String> aliceFirst = ConcurrentHashMap.newKeySet();
+ Set<String> bobFirst = ConcurrentHashMap.newKeySet();
+ Set<String> received1 = ConcurrentHashMap.newKeySet();
+ Thread t1 = drainStreamTo(alice, received1, aliceFirst);
+ Thread t2 = drainStreamTo(bob, received1, bobFirst);
+ t1.join();
+ t2.join();
+ assertEquals(received1, firstSent, "first batch must be delivered
exactly once");
+ assertTrue(!aliceFirst.isEmpty() && !bobFirst.isEmpty(),
+ "controller must split the first batch across alice + bob");
+
+ // Close alice's consumer and re-subscribe under the same name. The
controller
+ // sees the existing session for "alice", attaches the new subscribe
to it, and
+ // pushes back the same assignment without touching bob.
+ alice.close();
+ @Cleanup
+ StreamConsumer<String> aliceRejoined =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("alice")
+ .subscribe();
+
+ // Second batch: pin the same key set so each generation routes to the
same
+ // segments — alice's per-key share must match across batches (modulo
prefix).
+ int secondN = 80;
+ Set<String> secondSent = new HashSet<>();
+ for (int i = 0; i < secondN; i++) {
+ String v = "second-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ secondSent.add(v);
+ }
+
+ Set<String> aliceSecond = ConcurrentHashMap.newKeySet();
+ Set<String> bobSecond = ConcurrentHashMap.newKeySet();
+ Set<String> received2 = ConcurrentHashMap.newKeySet();
+ Thread r1 = drainStreamTo(aliceRejoined, received2, aliceSecond);
+ Thread r2 = drainStreamTo(bob, received2, bobSecond);
+ r1.join();
+ r2.join();
+ assertEquals(received2, secondSent, "second batch must be delivered
exactly once");
+
+ Set<String> aliceFirstKeys = stripPrefix(aliceFirst, "first-");
+ Set<String> aliceSecondKeys = stripPrefix(aliceSecond, "second-");
+ assertEquals(aliceSecondKeys, aliceFirstKeys,
+ "alice must keep her segments after re-subscribe within
grace");
+ Set<String> bobFirstKeys = stripPrefix(bobFirst, "first-");
+ Set<String> bobSecondKeys = stripPrefix(bobSecond, "second-");
+ assertEquals(bobSecondKeys, bobFirstKeys,
+ "bob must keep his segments unchanged when alice
re-subscribes");
+ }
+
+ /**
+ * If a consumer disconnects and stays away past the controller grace
period, its
+ * session is evicted and segments rebalanced to remaining peers. We force
a
+ * disconnect by closing alice's dedicated client (consumer.close() alone
doesn't
+ * sever the underlying connection), then trust the rebalance to land
while bob
+ * drains the second batch.
+ */
+ @Test
+ public void testStreamConsumerDisconnectPastGraceTriggersReassignment()
throws Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "stream-evict-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // alice on her own client so we can sever the connection
independently.
+ PulsarClient aliceClient = newV5Client();
+ StreamConsumer<String> alice =
aliceClient.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("alice")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ @Cleanup
+ StreamConsumer<String> bob =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .consumerName("bob")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int firstN = 60;
+ for (int i = 0; i < firstN; i++) {
+ producer.newMessage().key("k-" + i).value("first-" + i).send();
+ }
+
+ Set<String> received1 = ConcurrentHashMap.newKeySet();
+ Set<String> aliceFirst = ConcurrentHashMap.newKeySet();
+ Set<String> bobFirst = ConcurrentHashMap.newKeySet();
+ Thread t1 = drainStreamTo(alice, received1, aliceFirst);
+ Thread t2 = drainStreamTo(bob, received1, bobFirst);
+ t1.join();
+ t2.join();
+ assertTrue(!aliceFirst.isEmpty() && !bobFirst.isEmpty(),
+ "controller must split first batch across alice + bob");
+
+ // Sever alice's connection. The broker arms the grace timer; once it
fires the
+ // rebalance pushes a new assignment to bob covering every active
segment.
+ aliceClient.close();
+
+ // Produce immediately and drain bob until we've received all
second-batch
+ // messages — the rebalance will land mid-drain and bob picks up
alice's
+ // segments. Cap with a generous total deadline so a regression
doesn't hang.
+ int secondN = 60;
+ Set<String> secondSent = new HashSet<>();
+ for (int i = 0; i < secondN; i++) {
+ String v = "second-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ secondSent.add(v);
+ }
+
+ Set<String> bobSecond = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (bobSecond.size() < secondN && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = bob.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ bobSecond.add(msg.value());
+ }
+ }
+ assertEquals(bobSecond, secondSent,
+ "after past-grace eviction bob must end up serving every
segment");
+ }
+
+ private static Set<String> stripPrefix(Set<String> values, String prefix) {
+ Set<String> out = new HashSet<>(values.size());
+ for (String v : values) {
+ if (v.startsWith(prefix)) {
+ out.add(v.substring(prefix.length()));
+ }
+ }
+ return out;
+ }
}