This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 35ec14a6460 [fix][client] PIP-468: Wire V5 StreamConsumer to broker
SubscriptionCoordinator (#25612)
35ec14a6460 is described below
commit 35ec14a6460017db8844dbb5e48c68869ba4c8de
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 29 06:03:34 2026 -0700
[fix][client] PIP-468: Wire V5 StreamConsumer to broker
SubscriptionCoordinator (#25612)
---
.../api/v5/V5ConcurrentProducersConsumersTest.java | 151 ++++++++++++
.../client/api/v5/V5MultipleConsumersTest.java | 260 +++++++++++++++++++++
.../client/impl/v5/ScalableConsumerClient.java | 251 ++++++++++++++++++++
.../client/impl/v5/ScalableStreamConsumer.java | 80 ++++---
.../client/impl/v5/StreamConsumerBuilderV5.java | 23 +-
.../org/apache/pulsar/client/impl/ClientCnx.java | 72 ++++++
.../pulsar/client/impl/PulsarClientImpl.java | 2 +-
.../client/impl/ScalableConsumerSession.java | 43 ++++
.../apache/pulsar/common/protocol/Commands.java | 21 ++
9 files changed, 865 insertions(+), 38 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConcurrentProducersConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConcurrentProducersConsumersTest.java
new file mode 100644
index 00000000000..5c02b81afc9
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConcurrentProducersConsumersTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Sustained-throughput coverage: many producers + many QueueConsumers running
+ * concurrently against the same multi-segment topic + subscription. Asserts
no message
+ * is dropped or duplicated under load, and that every consumer pulls its
share.
+ */
+public class V5ConcurrentProducersConsumersTest extends V5ClientBaseTest {
+
+ @Test
+ public void testManyProducersAndConsumersConcurrent() throws Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "concurrent-sub";
+
+ int numProducers = 4;
+ int numConsumers = 3;
+ int messagesPerProducer = 100;
+ int totalMessages = numProducers * messagesPerProducer;
+
+ // Subscribe consumers up front so the subscription cursor is anchored
at the
+ // start before producers begin.
+ List<QueueConsumer<String>> consumers = new ArrayList<>();
+ for (int i = 0; i < numConsumers; i++) {
+ consumers.add(v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe());
+ }
+
+ // Per-consumer received sets so we can verify load distribution and
disjointness.
+ List<Set<String>> perConsumer = new ArrayList<>();
+ for (int i = 0; i < numConsumers; i++) {
+ perConsumer.add(ConcurrentHashMap.newKeySet());
+ }
+ Set<String> received = ConcurrentHashMap.newKeySet();
+
+ // Drainer threads: each consumer pulls until it sees no message for a
quiet
+ // window. They run for the full duration of the test.
+ List<Thread> drainers = new ArrayList<>();
+ for (int i = 0; i < numConsumers; i++) {
+ final int idx = i;
+ Thread t = new Thread(() -> {
+ QueueConsumer<String> c = consumers.get(idx);
+ Set<String> mine = perConsumer.get(idx);
+ try {
+ while (true) {
+ Message<String> msg = c.receive(Duration.ofSeconds(1));
+ if (msg == null) {
+ // No more messages for at least 1s — exit.
Producers are
+ // expected to finish well before this idle window.
+ return;
+ }
+ received.add(msg.value());
+ mine.add(msg.value());
+ c.acknowledge(msg.id());
+ }
+ } catch (Exception ignored) {
+ }
+ }, "drainer-" + i);
+ t.start();
+ drainers.add(t);
+ }
+
+ // Producer threads: each spins up its own V5 producer and sends a
slice.
+ Set<String> sent = ConcurrentHashMap.newKeySet();
+ List<Thread> producerThreads = new ArrayList<>();
+ for (int p = 0; p < numProducers; p++) {
+ final int producerIdx = p;
+ Thread t = new Thread(() -> {
+ try (Producer<String> producer =
v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create()) {
+ for (int i = 0; i < messagesPerProducer; i++) {
+ String v = "p" + producerIdx + "-m" + i;
+ producer.newMessage().key("k-" + producerIdx + "-" +
i).value(v).send();
+ sent.add(v);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, "producer-" + p);
+ t.start();
+ producerThreads.add(t);
+ }
+
+ for (Thread t : producerThreads) {
+ t.join();
+ }
+ assertEquals(sent.size(), totalMessages, "every producer must complete
its slice");
+
+ // Wait for drainers to finish (they exit on a 1s idle window once all
messages
+ // are consumed). Cap the wait so a regression doesn't hang the suite.
+ for (Thread t : drainers) {
+ t.join(60_000L);
+ }
+
+ for (var c : consumers) {
+ c.close();
+ }
+
+ assertEquals(received, sent, "every produced message must be consumed
exactly once");
+
+ // No duplicates across consumers.
+ for (int i = 0; i < numConsumers; i++) {
+ for (int j = i + 1; j < numConsumers; j++) {
+ Set<String> overlap = new HashSet<>(perConsumer.get(i));
+ overlap.retainAll(perConsumer.get(j));
+ assertTrue(overlap.isEmpty(),
+ "consumers " + i + "/" + j + " must not overlap, got "
+ overlap.size()
+ + " duplicates");
+ }
+ }
+
+ // Load distribution: every consumer must have pulled a non-trivial
share.
+ for (int i = 0; i < numConsumers; i++) {
+ assertTrue(!perConsumer.get(i).isEmpty(),
+ "consumer #" + i + " must receive at least one message");
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
new file mode 100644
index 00000000000..11e71dfd069
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for multiple consumers attached to the same subscription on a
multi-segment
+ * scalable topic. Each consumer type has different multiplicity semantics:
+ * <ul>
+ * <li><b>QueueConsumer</b> — Shared subscription per segment. Multiple
consumers share
+ * the load: messages distribute across consumers, every message is
delivered
+ * exactly once across the consumer set.</li>
+ * <li><b>StreamConsumer</b> — Exclusive subscription per segment. A second
consumer
+ * on the same subscription collides on segment attach (broker enforces
the
+ * exclusive lock); per-consumer segment assignment via a subscription
coordinator
+ * is a separate, not-yet-implemented feature.</li>
+ * <li><b>CheckpointConsumer</b> — uses readers, no broker-side cursor.
Multiple
+ * checkpoint consumers each independently read the full stream — no load
+ * balancing, full duplication.</li>
+ * </ul>
+ */
+public class V5MultipleConsumersTest extends V5ClientBaseTest {
+
+ @Test
+ public void testQueueConsumersShareLoad() throws Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "shared-load-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Three consumers on the same subscription. Subscribe before
producing so the
+ // load distributes from the start.
+ @Cleanup
+ QueueConsumer<String> c1 = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic).subscriptionName(subscription)
+ .subscribe();
+ @Cleanup
+ QueueConsumer<String> c2 = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic).subscriptionName(subscription)
+ .subscribe();
+ @Cleanup
+ QueueConsumer<String> c3 = v5Client.newQueueConsumer(Schema.string())
+ .topic(topic).subscriptionName(subscription)
+ .subscribe();
+
+ int n = 150;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Drain in parallel: each consumer pulls until it sees no message for
a short
+ // window. The set of values seen across all three must equal `sent`
exactly.
+ Set<String> received = ConcurrentHashMap.newKeySet();
+ Set<String> c1Got = ConcurrentHashMap.newKeySet();
+ Set<String> c2Got = ConcurrentHashMap.newKeySet();
+ Set<String> c3Got = ConcurrentHashMap.newKeySet();
+ Thread t1 = drainTo(c1, received, c1Got);
+ Thread t2 = drainTo(c2, received, c2Got);
+ Thread t3 = drainTo(c3, received, c3Got);
+ t1.join();
+ t2.join();
+ t3.join();
+
+ assertEquals(received, sent, "every message must be delivered exactly
once");
+
+ // No duplicates: the three per-consumer sets must be pairwise
disjoint.
+ Set<String> overlap12 = new HashSet<>(c1Got);
+ overlap12.retainAll(c2Got);
+ assertTrue(overlap12.isEmpty(), "c1/c2 overlap: " + overlap12);
+ Set<String> overlap13 = new HashSet<>(c1Got);
+ overlap13.retainAll(c3Got);
+ assertTrue(overlap13.isEmpty(), "c1/c3 overlap: " + overlap13);
+ Set<String> overlap23 = new HashSet<>(c2Got);
+ overlap23.retainAll(c3Got);
+ assertTrue(overlap23.isEmpty(), "c2/c3 overlap: " + overlap23);
+
+ // Load distribution: each consumer must have received at least one
message —
+ // otherwise the "shared" semantics are broken.
+ assertTrue(!c1Got.isEmpty() && !c2Got.isEmpty() && !c3Got.isEmpty(),
+ "each consumer must get at least one message"
+ + " (c1=" + c1Got.size() + " c2=" + c2Got.size() + "
c3=" + c3Got.size() + ")");
+ }
+
+ private Thread drainTo(QueueConsumer<String> consumer, Set<String> all,
Set<String> mine) {
+ Thread t = new Thread(() -> {
+ try {
+ while (true) {
+ Message<String> msg =
consumer.receive(Duration.ofMillis(500));
+ if (msg == null) {
+ return;
+ }
+ all.add(msg.value());
+ mine.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ } catch (Exception ignored) {
+ }
+ }, "queue-consumer-drainer");
+ t.start();
+ return t;
+ }
+
+ @Test
+ public void testCheckpointConsumersEachSeeFullStream() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ int n = 50;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // CheckpointConsumer uses Readers: there's no broker-side
subscription cursor,
+ // so multiple consumers don't load-balance — each independently reads
the full
+ // stream from the requested start position.
+ @Cleanup
+ CheckpointConsumer<String> a =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+ @Cleanup
+ CheckpointConsumer<String> b =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ Set<String> aGot = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = a.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "consumer A missed message #" + i);
+ aGot.add(msg.value());
+ }
+ Set<String> bGot = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = b.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "consumer B missed message #" + i);
+ bGot.add(msg.value());
+ }
+ assertEquals(aGot, sent, "consumer A must see every produced message");
+ assertEquals(bGot, sent, "consumer B must see every produced message");
+ }
+
+ @Test
+ public void testStreamConsumersSplitSegmentsAcrossConsumers() throws
Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "stream-split-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Two stream consumers on the same subscription. The broker's
+ // SubscriptionCoordinator rebalances on the second attach, splitting
the four
+ // segments across the two consumers — each ends up with a disjoint
subset.
+ @Cleanup
+ StreamConsumer<String> a = v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+ @Cleanup
+ StreamConsumer<String> b = v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 100;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Drain in parallel: each consumer pulls until idle.
+ Set<String> received = ConcurrentHashMap.newKeySet();
+ Set<String> aGot = ConcurrentHashMap.newKeySet();
+ Set<String> bGot = ConcurrentHashMap.newKeySet();
+ Thread ta = drainStreamTo(a, received, aGot);
+ Thread tb = drainStreamTo(b, received, bGot);
+ ta.join();
+ tb.join();
+
+ assertEquals(received, sent, "every message must be delivered exactly
once across consumers");
+
+ Set<String> overlap = new HashSet<>(aGot);
+ overlap.retainAll(bGot);
+ assertTrue(overlap.isEmpty(),
+ "no message should be delivered to both stream consumers,
overlap=" + overlap);
+
+ assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+ "controller must split segments across both consumers"
+ + " (a=" + aGot.size() + " b=" + bGot.size() + ")");
+ }
+
+ private Thread drainStreamTo(StreamConsumer<String> consumer, Set<String>
all, Set<String> mine) {
+ Thread t = new Thread(() -> {
+ try {
+ MessageId last = null;
+ while (true) {
+ Message<String> msg =
consumer.receive(Duration.ofSeconds(1));
+ if (msg == null) {
+ if (last != null) {
+ consumer.acknowledgeCumulative(last);
+ }
+ return;
+ }
+ all.add(msg.value());
+ mine.add(msg.value());
+ last = msg.id();
+ }
+ } catch (Exception ignored) {
+ }
+ }, "stream-consumer-drainer");
+ t.start();
+ return t;
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
new file mode 100644
index 00000000000..cceaa3b3edf
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ScalableConsumerSession;
+import org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
+import org.apache.pulsar.common.api.proto.ScalableAssignedSegment;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.scalable.HashRange;
+
+/**
+ * Client-side session for a scalable-topic consumer (Stream or Checkpoint).
+ *
+ * <p>Sends a {@code CommandScalableTopicSubscribe} on attach, awaits the
initial
+ * {@link ScalableConsumerAssignment}, and then accepts pushed updates via
+ * {@link #onAssignmentUpdate}. The current assignment is exposed as a list of
+ * {@link ActiveSegment} entries (segmentId, hashRange, segmentTopic) so the
consumer
+ * implementations can use the same segment-attach plumbing they already have.
+ *
+ * <p>Mirrors {@link DagWatchClient} in shape; the wire path is different —
subscribe
+ * is request/response (matched by request id) and updates are tagged with the
+ * client-chosen consumer id.
+ */
+final class ScalableConsumerClient implements ScalableConsumerSession,
AutoCloseable {
+
+ private static final Logger LOG = Logger.get(ScalableConsumerClient.class);
+ private final Logger log;
+
+ private final PulsarClientImpl v4Client;
+ private final TopicName topicName;
+ private final String subscription;
+ private final String consumerName;
+ private final long consumerId;
+ private final ScalableConsumerType consumerType;
+
+ private final AtomicReference<List<ActiveSegment>> currentAssignment =
+ new AtomicReference<>(List.of());
+ private volatile long currentEpoch = -1L;
+ private final CompletableFuture<List<ActiveSegment>>
initialAssignmentFuture =
+ new CompletableFuture<>();
+ private volatile AssignmentChangeListener listener;
+ private volatile ClientCnx cnx;
+ private volatile boolean closed = false;
+
+ ScalableConsumerClient(PulsarClientImpl v4Client,
+ TopicName topicName,
+ String subscription,
+ String consumerName,
+ ScalableConsumerType consumerType) {
+ this.v4Client = v4Client;
+ this.topicName = topicName;
+ this.subscription = subscription;
+ this.consumerName = consumerName;
+ this.consumerId = v4Client.newConsumerId();
+ this.consumerType = consumerType;
+ this.log = LOG.with()
+ .attr("topic", topicName)
+ .attr("subscription", subscription)
+ .attr("consumerName", consumerName)
+ .attr("consumerId", consumerId)
+ .build();
+ }
+
+ /**
+ * Resolve the controller-leader broker URL via a DAG-watch lookup, open a
+ * connection to it directly (no v4 topic lookup — scalable topic URIs
aren't
+ * resolvable through the v4 lookup service), then send the subscribe
command and
+ * complete with the initial assignment.
+ */
+ CompletableFuture<List<ActiveSegment>> start() {
+ DagWatchClient watch = new DagWatchClient(v4Client, topicName);
+ watch.start()
+ .thenCompose(layout -> {
+ String controllerUrl = layout.controllerBrokerUrl();
+ if (controllerUrl == null || controllerUrl.isEmpty()) {
+ // Controller leader election hasn't completed yet (or
the broker
+ // doesn't advertise the URL). Fall back to the
configured
+ // service URL — any broker will forward the subscribe
request
+ // to the controller via getOrCreateController.
+ log.info()
+ .log("Layout has no controller URL; connecting
to service URL");
+ return v4Client.getConnectionToServiceUrl();
+ }
+ URI uri = URI.create(controllerUrl);
+ InetSocketAddress addr =
InetSocketAddress.createUnresolved(
+ uri.getHost(), uri.getPort());
+ return v4Client.getConnection(addr, addr,
+ v4Client.getCnxPool().genRandomKeyToSelectCon());
+ })
+ .whenComplete((cnx, ex) -> {
+ // Close the watch session as soon as we have the
controller URL —
+ // the controller pushes assignment updates directly, so
we don't
+ // need a long-lived layout watch on this consumer.
+ watch.close();
+ })
+ .thenAccept(cnx -> {
+ this.cnx = cnx;
+ cnx.registerScalableConsumerSession(consumerId, this);
+
+ long requestId = v4Client.newRequestId();
+ var responseFuture = new
TimedCompletableFuture<ScalableConsumerAssignment>();
+ cnx.getPendingRequests().put(requestId, responseFuture);
+
+ cnx.ctx().writeAndFlush(Commands.newScalableTopicSubscribe(
+ requestId,
+ topicName.toString(),
+ subscription,
+ consumerName,
+ consumerId,
+ consumerType))
+ .addListener(writeFuture -> {
+ if (!writeFuture.isSuccess()) {
+ cnx.getPendingRequests().remove(requestId);
+
cnx.removeScalableConsumerSession(consumerId);
+
initialAssignmentFuture.completeExceptionally(
+ new
PulsarClientException(writeFuture.cause()));
+ }
+ });
+
+ responseFuture.whenComplete((assignment, ex) -> {
+ if (ex != null) {
+ initialAssignmentFuture.completeExceptionally(ex);
+ return;
+ }
+ List<ActiveSegment> segments =
toSegmentList(assignment);
+ currentAssignment.set(segments);
+ currentEpoch = assignment.getLayoutEpoch();
+ log.info().attr("epoch", currentEpoch)
+ .attr("segments", segments.size())
+ .log("Initial assignment received");
+ initialAssignmentFuture.complete(segments);
+ });
+ })
+ .exceptionally(ex -> {
+ initialAssignmentFuture.completeExceptionally(ex);
+ return null;
+ });
+ return initialAssignmentFuture;
+ }
+
+ @Override
+ public void onAssignmentUpdate(ScalableConsumerAssignment assignment) {
+ if (closed) {
+ return;
+ }
+ long epoch = assignment.getLayoutEpoch();
+ if (epoch < currentEpoch) {
+ log.info().attr("staleEpoch", epoch).attr("currentEpoch",
currentEpoch)
+ .log("Ignoring stale assignment update");
+ return;
+ }
+ List<ActiveSegment> newSegments = toSegmentList(assignment);
+ List<ActiveSegment> oldSegments =
currentAssignment.getAndSet(newSegments);
+ currentEpoch = epoch;
+ log.info().attr("epoch", epoch).attr("segments", newSegments.size())
+ .log("Assignment updated");
+
+ AssignmentChangeListener l = listener;
+ if (l != null) {
+ try {
+ l.onAssignmentChange(newSegments, oldSegments);
+ } catch (Exception e) {
+ log.error().exception(e).log("Error in assignment change
listener");
+ }
+ }
+ }
+
+ @Override
+ public void connectionClosed() {
+ log.warn("Scalable consumer session connection closed");
+ cnx = null;
+ if (!initialAssignmentFuture.isDone()) {
+ initialAssignmentFuture.completeExceptionally(
+ new PulsarClientException("Connection closed before
initial assignment arrived"));
+ }
+ // TODO: implement automatic re-subscribe on reconnect
+ }
+
+ private static List<ActiveSegment>
toSegmentList(ScalableConsumerAssignment assignment) {
+ List<ActiveSegment> segments = new
ArrayList<>(assignment.getSegmentsCount());
+ for (int i = 0; i < assignment.getSegmentsCount(); i++) {
+ ScalableAssignedSegment s = assignment.getSegmentAt(i);
+ segments.add(new ActiveSegment(
+ s.getSegmentId(),
+ HashRange.of((int) s.getHashStart(), (int) s.getHashEnd()),
+ s.getSegmentTopic()));
+ }
+ return Collections.unmodifiableList(segments);
+ }
+
+ List<ActiveSegment> currentAssignment() {
+ return currentAssignment.get();
+ }
+
+ void setListener(AssignmentChangeListener listener) {
+ this.listener = listener;
+ }
+
+ long consumerId() {
+ return consumerId;
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ ClientCnx c = cnx;
+ if (c != null) {
+ c.removeScalableConsumerSession(consumerId);
+ // No close command for now — broker reaps registrations via grace
timer on
+ // disconnect. A future refactor can add an explicit unsubscribe.
+ }
+ }
+
+ interface AssignmentChangeListener {
+ void onAssignmentChange(List<ActiveSegment> newSegments,
List<ActiveSegment> oldSegments);
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 31844f069bb..e33191c2861 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -56,7 +56,8 @@ import
org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
* vector. This ensures that acknowledging a single message correctly advances
* all segments, not just the one it came from.
*/
-final class ScalableStreamConsumer<T> implements StreamConsumer<T>,
DagWatchClient.LayoutChangeListener {
+final class ScalableStreamConsumer<T>
+ implements StreamConsumer<T>,
ScalableConsumerClient.AssignmentChangeListener {
private static final Logger LOG = Logger.get(ScalableStreamConsumer.class);
private final Logger log;
@@ -65,7 +66,7 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
private final Schema<T> v5Schema;
private final org.apache.pulsar.client.api.Schema<T> v4Schema;
private final ConsumerConfigurationData<T> consumerConf;
- private final DagWatchClient dagWatch;
+ private final ScalableConsumerClient session;
private final String topicName;
private final String subscriptionName;
@@ -92,33 +93,36 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
private ScalableStreamConsumer(PulsarClientV5 client,
Schema<T> v5Schema,
ConsumerConfigurationData<T> consumerConf,
- DagWatchClient dagWatch) {
+ ScalableConsumerClient session,
+ String topicName) {
this.client = client;
this.v5Schema = v5Schema;
this.v4Schema = SchemaAdapter.toV4(v5Schema);
this.consumerConf = consumerConf;
- this.dagWatch = dagWatch;
- this.topicName = dagWatch.topicName().toString();
+ this.session = session;
+ this.topicName = topicName;
this.subscriptionName = consumerConf.getSubscriptionName();
this.log = LOG.with().attr("topic", topicName).attr("subscription",
subscriptionName).build();
this.asyncView = new AsyncStreamConsumerV5<>(this);
}
/**
- * Create a fully initialized consumer asynchronously. The returned future
completes
- * only after every initial segment has been successfully subscribed. If
any segment
- * fails to subscribe, all already-subscribed segments are closed and the
future
- * completes exceptionally.
+ * Create a fully initialized consumer asynchronously. The session has
already
+ * registered with the controller and the {@code initialAssignment} list
contains
+ * the segments this consumer should attach to. The returned future
completes only
+ * after every assigned segment has been successfully subscribed.
*/
static <T> CompletableFuture<StreamConsumer<T>> createAsync(PulsarClientV5
client,
Schema<T>
v5Schema,
ConsumerConfigurationData<T> consumerConf,
- DagWatchClient
dagWatch,
-
ClientSegmentLayout initialLayout) {
- ScalableStreamConsumer<T> consumer = new
ScalableStreamConsumer<>(client, v5Schema, consumerConf, dagWatch);
- return consumer.subscribeSegments(initialLayout)
+
ScalableConsumerClient session,
+ String
topicName,
+
List<ActiveSegment> initialAssignment) {
+ ScalableStreamConsumer<T> consumer = new ScalableStreamConsumer<>(
+ client, v5Schema, consumerConf, session, topicName);
+ return consumer.subscribeAssigned(initialAssignment)
.thenApply(__ -> {
- dagWatch.setListener(consumer);
+ session.setListener(consumer);
return (StreamConsumer<T>) consumer;
})
.exceptionallyCompose(ex -> consumer.closeAsync().handle((__,
___) -> {
@@ -249,7 +253,7 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
CompletableFuture<Void> closeAsync() {
closed = true;
- dagWatch.close();
+ session.close();
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (var future : segmentConsumers.values()) {
@@ -262,37 +266,49 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
.whenComplete((__, ___) -> segmentConsumers.clear());
}
- // --- Layout change handling ---
+ // --- Assignment change handling ---
@Override
- public void onLayoutChange(ClientSegmentLayout newLayout,
ClientSegmentLayout oldLayout) {
+ public void onAssignmentChange(List<ActiveSegment> newSegments,
List<ActiveSegment> oldSegments) {
// Fully async: safe to run on the netty IO thread that delivered the
update.
- subscribeSegments(newLayout).exceptionally(ex -> {
- log.warn().exceptionMessage(ex).log("Failed to apply layout
update");
+ subscribeAssigned(newSegments).exceptionally(ex -> {
+ log.warn().exceptionMessage(ex).log("Failed to apply assignment
update");
return null;
});
}
- private CompletableFuture<Void> subscribeSegments(ClientSegmentLayout
layout) {
- // The stream consumer is controller-driven: it only subscribes to the
segments
- // the controller currently designates as active. When a segment moves
out of the
- // active set (because of a split or merge), we leave its v4 consumer
alive so
- // pending acks for messages already pulled from it can still be
forwarded; the
- // receive loop closes the consumer naturally once it sees
TopicTerminated.
- var activeIds = ConcurrentHashMap.<Long>newKeySet();
- for (var seg : layout.activeSegments()) {
- activeIds.add(seg.segmentId());
+ private CompletableFuture<Void> subscribeAssigned(List<ActiveSegment>
assigned) {
+ // Controller-driven assignment: the broker's SubscriptionCoordinator
decides
+ // which segments this consumer owns at any moment. We subscribe to
exactly
+ // those, regardless of whether the controller picked them from the
active or
+ // sealed set — to the v4 layer they're just per-segment Exclusive
subscriptions.
+ var assignedIds = ConcurrentHashMap.<Long>newKeySet();
+ for (var seg : assigned) {
+ assignedIds.add(seg.segmentId());
}
- // Subscribe to newly-active segments asynchronously.
+ // Segments that fell out of our assignment (rebalanced away to another
+ // consumer): close our v4 consumer so the Exclusive lock is released
and
+ // the new owner can attach. Sealed-and-drained segments take a
different
+ // path: the receive loop closes them on TopicTerminated.
+ for (var entry : segmentConsumers.entrySet()) {
+ if (!assignedIds.contains(entry.getKey())) {
+ log.info().attr("segmentId", entry.getKey())
+ .log("Closing consumer for segment removed from
assignment");
+ entry.getValue().thenAccept(c -> c.closeAsync());
+ segmentConsumers.remove(entry.getKey());
+ latestDelivered.remove(entry.getKey());
+ }
+ }
+
+ // Subscribe to newly-assigned segments.
List<CompletableFuture<?>> futures = new ArrayList<>();
- for (var seg : layout.activeSegments()) {
+ for (var seg : assigned) {
futures.add(segmentConsumers.computeIfAbsent(seg.segmentId(),
id -> createSegmentConsumerAsync(seg)));
}
- log.info().attr("epoch", layout.epoch())
- .attr("segments", activeIds).log("Stream consumer layout
applied");
+ log.info().attr("segments", assignedIds).log("Stream consumer
assignment applied");
return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index d1ab3109387..f8ae92b014a 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -31,6 +31,7 @@ import
org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
/**
@@ -72,11 +73,23 @@ final class StreamConsumerBuilderV5<T> implements
StreamConsumerBuilder<T> {
}
TopicName topic = V5Utils.asScalableTopicName(topicName);
- DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
-
- return dagWatch.start()
- .thenCompose(initialLayout ->
ScalableStreamConsumer.createAsync(
- client, v5Schema, conf, dagWatch, initialLayout));
+ // Default the consumer name to a stable random when the user didn't
set one —
+ // ScalableConsumerClient uses it as the registration key with the
controller.
+ // Use a full UUID (~122 bits of entropy) so the registration key is
unique in
+ // practice, even across many concurrent attaches.
+ if (conf.getConsumerName() == null ||
conf.getConsumerName().isEmpty()) {
+ conf.setConsumerName("v5-stream-" + java.util.UUID.randomUUID());
+ }
+ ScalableConsumerClient session = new ScalableConsumerClient(
+ client.v4Client(),
+ topic,
+ conf.getSubscriptionName(),
+ conf.getConsumerName(),
+ ScalableConsumerType.STREAM);
+
+ return session.start()
+ .thenCompose(initialAssignment ->
ScalableStreamConsumer.createAsync(
+ client, v5Schema, conf, session, topic.toString(),
initialAssignment));
}
@Override
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7ab0b4f0239..75131f0af1d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -167,6 +167,17 @@ public class ClientCnx extends PulsarHandler {
.concurrencyLevel(1)
.build();
+ /**
+ * Per-consumer scalable subscribe sessions, keyed by the {@code
consumerId} the V5
+ * client assigned at subscribe time. The broker tags every
+ * {@link CommandScalableTopicAssignmentUpdate} with this id.
+ */
+ private final ConcurrentLongHashMap<ScalableConsumerSession>
scalableConsumerSessions =
+ ConcurrentLongHashMap.<ScalableConsumerSession>newBuilder()
+ .expectedItems(4)
+ .concurrencyLevel(1)
+ .build();
+
private final CompletableFuture<Void> connectionFuture = new
CompletableFuture<Void>();
private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new
ConcurrentLinkedQueue<>();
@@ -361,6 +372,7 @@ public class ClientCnx extends PulsarHandler {
transactionMetaStoreHandlers.forEach((id, handler) ->
handler.connectionClosed(this));
topicListWatchers.forEach((__, watcher) ->
watcher.connectionClosed(this));
dagWatchSessions.forEach((__, session) -> session.connectionClosed());
+ scalableConsumerSessions.forEach((__, session) ->
session.connectionClosed());
waitingLookupRequests.clear();
@@ -368,6 +380,7 @@ public class ClientCnx extends PulsarHandler {
consumers.clear();
topicListWatchers.clear();
dagWatchSessions.clear();
+ scalableConsumerSessions.clear();
timeoutTask.cancel(true);
}
@@ -1351,6 +1364,65 @@ public class ClientCnx extends PulsarHandler {
dagWatchSessions.remove(sessionId);
}
+ @Override
+ protected void handleCommandScalableTopicSubscribeResponse(
+
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse cmd) {
+ checkArgument(state == State.Ready);
+
+ long requestId = cmd.getRequestId();
+ log.debug().attr("requestId", requestId).log("Received
scalableTopicSubscribeResponse");
+
+ if (cmd.hasError()) {
+ CompletableFuture<? extends Object> requestFuture =
pendingRequests.remove(requestId);
+ if (requestFuture != null && !requestFuture.isDone()) {
+ requestFuture.completeExceptionally(new PulsarClientException(
+ "Scalable topic subscribe failed: " + cmd.getError()
+ + (cmd.hasMessage() ? " - " + cmd.getMessage()
: "")));
+ }
+ return;
+ }
+
+ @SuppressWarnings("unchecked")
+
TimedCompletableFuture<org.apache.pulsar.common.api.proto.ScalableConsumerAssignment>
+ requestFuture = (TimedCompletableFuture<
+
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment>)
pendingRequests.remove(requestId);
+ if (requestFuture == null || requestFuture.isDone()) {
+ log.warn().attr("requestId", requestId)
+ .log("Received scalable topic subscribe response for
unknown / completed request");
+ return;
+ }
+ // Defensive copy: the proto payload's backing buffer is reused by the
decoder
+ // once we return. Build a snapshot the future's downstream consumers
can hold.
+ var assignment = new
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment();
+ assignment.copyFrom(cmd.getAssignment());
+ requestFuture.complete(assignment);
+ }
+
+ @Override
+ protected void handleCommandScalableTopicAssignmentUpdate(
+
org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate cmd) {
+ checkArgument(state == State.Ready);
+
+ long consumerId = cmd.getConsumerId();
+ ScalableConsumerSession session =
scalableConsumerSessions.get(consumerId);
+ if (session == null) {
+ log.warn().attr("consumerId", consumerId)
+ .log("Received scalable topic assignment update for
unknown consumer");
+ return;
+ }
+ var assignment = new
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment();
+ assignment.copyFrom(cmd.getAssignment());
+ session.onAssignmentUpdate(assignment);
+ }
+
+ public void registerScalableConsumerSession(long consumerId,
ScalableConsumerSession session) {
+ scalableConsumerSessions.put(consumerId, session);
+ }
+
+ public void removeScalableConsumerSession(long consumerId) {
+ scalableConsumerSessions.remove(consumerId);
+ }
+
/**
* check serverError and take appropriate action.
* <ul>
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index baf6198420e..a8c7b46b3db 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1309,7 +1309,7 @@ public class PulsarClientImpl implements PulsarClient {
return producerIdGenerator.getAndIncrement();
}
- long newConsumerId() {
+ public long newConsumerId() {
return consumerIdGenerator.getAndIncrement();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableConsumerSession.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableConsumerSession.java
new file mode 100644
index 00000000000..ff42253a547
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableConsumerSession.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+
+/**
+ * Callback interface for a registered scalable-topic consumer. Implemented by
the V5
+ * client's per-consumer session. The broker pushes {@link
CommandScalableTopicAssignmentUpdate}
+ * messages tagged with the consumer's {@code consumerId}; {@link ClientCnx}
dispatches
+ * those to the matching session.
+ */
+public interface ScalableConsumerSession {
+
+ /**
+ * Called when the broker pushes a new segment assignment to this consumer
(after a
+ * peer joined/left the subscription or the topic layout changed).
+ */
+ void onAssignmentUpdate(ScalableConsumerAssignment assignment);
+
+ /**
+ * Called when the connection to the broker is closed. Implementations
should mark
+ * the session as needing reconnection and re-register on the next
available
+ * connection.
+ */
+ void connectionClosed();
+}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index c805164ef92..227aae5ee17 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -106,6 +106,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
import org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.common.api.proto.ServerError;
@@ -1709,6 +1710,26 @@ public class Commands {
return serializeWithSize(cmd);
}
+ /**
+ * Client -> Broker: register as a scalable consumer (Stream or
Checkpoint) and
+ * request the initial segment assignment. The broker leader persists the
+ * registration and replies with a {@link
CommandScalableTopicSubscribeResponse}.
+ */
+ public static ByteBuf newScalableTopicSubscribe(long requestId, String
topic,
+ String subscription,
String consumerName,
+ long consumerId,
+ ScalableConsumerType
consumerType) {
+ BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_SUBSCRIBE);
+ cmd.setScalableTopicSubscribe()
+ .setRequestId(requestId)
+ .setTopic(topic)
+ .setSubscription(subscription)
+ .setConsumerName(consumerName)
+ .setConsumerId(consumerId)
+ .setConsumerType(consumerType);
+ return serializeWithSize(cmd);
+ }
+
/**
* Broker -> Client: response to a scalable-topic subscribe request. On
success the
* caller must populate the nested {@link ScalableConsumerAssignment} via