This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 35ec14a6460 [fix][client] PIP-468: Wire V5 StreamConsumer to broker 
SubscriptionCoordinator (#25612)
35ec14a6460 is described below

commit 35ec14a6460017db8844dbb5e48c68869ba4c8de
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 29 06:03:34 2026 -0700

    [fix][client] PIP-468: Wire V5 StreamConsumer to broker 
SubscriptionCoordinator (#25612)
---
 .../api/v5/V5ConcurrentProducersConsumersTest.java | 151 ++++++++++++
 .../client/api/v5/V5MultipleConsumersTest.java     | 260 +++++++++++++++++++++
 .../client/impl/v5/ScalableConsumerClient.java     | 251 ++++++++++++++++++++
 .../client/impl/v5/ScalableStreamConsumer.java     |  80 ++++---
 .../client/impl/v5/StreamConsumerBuilderV5.java    |  23 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  72 ++++++
 .../pulsar/client/impl/PulsarClientImpl.java       |   2 +-
 .../client/impl/ScalableConsumerSession.java       |  43 ++++
 .../apache/pulsar/common/protocol/Commands.java    |  21 ++
 9 files changed, 865 insertions(+), 38 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConcurrentProducersConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConcurrentProducersConsumersTest.java
new file mode 100644
index 00000000000..5c02b81afc9
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ConcurrentProducersConsumersTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Sustained-throughput coverage: many producers + many QueueConsumers running
+ * concurrently against the same multi-segment topic + subscription. Asserts 
no message
+ * is dropped or duplicated under load, and that every consumer pulls its 
share.
+ */
+public class V5ConcurrentProducersConsumersTest extends V5ClientBaseTest {
+
+    @Test
+    public void testManyProducersAndConsumersConcurrent() throws Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "concurrent-sub";
+
+        int numProducers = 4;
+        int numConsumers = 3;
+        int messagesPerProducer = 100;
+        int totalMessages = numProducers * messagesPerProducer;
+
+        // Subscribe consumers up front so the subscription cursor is anchored 
at the
+        // start before producers begin.
+        List<QueueConsumer<String>> consumers = new ArrayList<>();
+        for (int i = 0; i < numConsumers; i++) {
+            consumers.add(v5Client.newQueueConsumer(Schema.string())
+                    .topic(topic)
+                    .subscriptionName(subscription)
+                    
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                    .subscribe());
+        }
+
+        // Per-consumer received sets so we can verify load distribution and 
disjointness.
+        List<Set<String>> perConsumer = new ArrayList<>();
+        for (int i = 0; i < numConsumers; i++) {
+            perConsumer.add(ConcurrentHashMap.newKeySet());
+        }
+        Set<String> received = ConcurrentHashMap.newKeySet();
+
+        // Drainer threads: each consumer pulls until it sees no message for a 
quiet
+        // window. They run for the full duration of the test.
+        List<Thread> drainers = new ArrayList<>();
+        for (int i = 0; i < numConsumers; i++) {
+            final int idx = i;
+            Thread t = new Thread(() -> {
+                QueueConsumer<String> c = consumers.get(idx);
+                Set<String> mine = perConsumer.get(idx);
+                try {
+                    while (true) {
+                        Message<String> msg = c.receive(Duration.ofSeconds(1));
+                        if (msg == null) {
+                            // No more messages for at least 1s — exit. 
Producers are
+                            // expected to finish well before this idle window.
+                            return;
+                        }
+                        received.add(msg.value());
+                        mine.add(msg.value());
+                        c.acknowledge(msg.id());
+                    }
+                } catch (Exception ignored) {
+                }
+            }, "drainer-" + i);
+            t.start();
+            drainers.add(t);
+        }
+
+        // Producer threads: each spins up its own V5 producer and sends a 
slice.
+        Set<String> sent = ConcurrentHashMap.newKeySet();
+        List<Thread> producerThreads = new ArrayList<>();
+        for (int p = 0; p < numProducers; p++) {
+            final int producerIdx = p;
+            Thread t = new Thread(() -> {
+                try (Producer<String> producer = 
v5Client.newProducer(Schema.string())
+                        .topic(topic)
+                        .create()) {
+                    for (int i = 0; i < messagesPerProducer; i++) {
+                        String v = "p" + producerIdx + "-m" + i;
+                        producer.newMessage().key("k-" + producerIdx + "-" + 
i).value(v).send();
+                        sent.add(v);
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }, "producer-" + p);
+            t.start();
+            producerThreads.add(t);
+        }
+
+        for (Thread t : producerThreads) {
+            t.join();
+        }
+        assertEquals(sent.size(), totalMessages, "every producer must complete 
its slice");
+
+        // Wait for drainers to finish (they exit on a 1s idle window once all 
messages
+        // are consumed). Cap the wait so a regression doesn't hang the suite.
+        for (Thread t : drainers) {
+            t.join(60_000L);
+        }
+
+        for (var c : consumers) {
+            c.close();
+        }
+
+        assertEquals(received, sent, "every produced message must be consumed 
exactly once");
+
+        // No duplicates across consumers.
+        for (int i = 0; i < numConsumers; i++) {
+            for (int j = i + 1; j < numConsumers; j++) {
+                Set<String> overlap = new HashSet<>(perConsumer.get(i));
+                overlap.retainAll(perConsumer.get(j));
+                assertTrue(overlap.isEmpty(),
+                        "consumers " + i + "/" + j + " must not overlap, got " 
+ overlap.size()
+                                + " duplicates");
+            }
+        }
+
+        // Load distribution: every consumer must have pulled a non-trivial 
share.
+        for (int i = 0; i < numConsumers; i++) {
+            assertTrue(!perConsumer.get(i).isEmpty(),
+                    "consumer #" + i + " must receive at least one message");
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
new file mode 100644
index 00000000000..11e71dfd069
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultipleConsumersTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for multiple consumers attached to the same subscription on a 
multi-segment
+ * scalable topic. Each consumer type has different multiplicity semantics:
+ * <ul>
+ *   <li><b>QueueConsumer</b> — Shared subscription per segment. Multiple 
consumers share
+ *       the load: messages distribute across consumers, every message is 
delivered
+ *       exactly once across the consumer set.</li>
+ *   <li><b>StreamConsumer</b> — Exclusive subscription per segment. A second 
consumer
+ *       on the same subscription collides on segment attach (broker enforces 
the
+ *       exclusive lock); per-consumer segment assignment via a subscription 
coordinator
+ *       is a separate, not-yet-implemented feature.</li>
+ *   <li><b>CheckpointConsumer</b> — uses readers, no broker-side cursor. 
Multiple
+ *       checkpoint consumers each independently read the full stream — no load
+ *       balancing, full duplication.</li>
+ * </ul>
+ */
+public class V5MultipleConsumersTest extends V5ClientBaseTest {
+
+    @Test
+    public void testQueueConsumersShareLoad() throws Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "shared-load-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Three consumers on the same subscription. Subscribe before 
producing so the
+        // load distributes from the start.
+        @Cleanup
+        QueueConsumer<String> c1 = v5Client.newQueueConsumer(Schema.string())
+                .topic(topic).subscriptionName(subscription)
+                .subscribe();
+        @Cleanup
+        QueueConsumer<String> c2 = v5Client.newQueueConsumer(Schema.string())
+                .topic(topic).subscriptionName(subscription)
+                .subscribe();
+        @Cleanup
+        QueueConsumer<String> c3 = v5Client.newQueueConsumer(Schema.string())
+                .topic(topic).subscriptionName(subscription)
+                .subscribe();
+
+        int n = 150;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Drain in parallel: each consumer pulls until it sees no message for 
a short
+        // window. The set of values seen across all three must equal `sent` 
exactly.
+        Set<String> received = ConcurrentHashMap.newKeySet();
+        Set<String> c1Got = ConcurrentHashMap.newKeySet();
+        Set<String> c2Got = ConcurrentHashMap.newKeySet();
+        Set<String> c3Got = ConcurrentHashMap.newKeySet();
+        Thread t1 = drainTo(c1, received, c1Got);
+        Thread t2 = drainTo(c2, received, c2Got);
+        Thread t3 = drainTo(c3, received, c3Got);
+        t1.join();
+        t2.join();
+        t3.join();
+
+        assertEquals(received, sent, "every message must be delivered exactly 
once");
+
+        // No duplicates: the three per-consumer sets must be pairwise 
disjoint.
+        Set<String> overlap12 = new HashSet<>(c1Got);
+        overlap12.retainAll(c2Got);
+        assertTrue(overlap12.isEmpty(), "c1/c2 overlap: " + overlap12);
+        Set<String> overlap13 = new HashSet<>(c1Got);
+        overlap13.retainAll(c3Got);
+        assertTrue(overlap13.isEmpty(), "c1/c3 overlap: " + overlap13);
+        Set<String> overlap23 = new HashSet<>(c2Got);
+        overlap23.retainAll(c3Got);
+        assertTrue(overlap23.isEmpty(), "c2/c3 overlap: " + overlap23);
+
+        // Load distribution: each consumer must have received at least one 
message —
+        // otherwise the "shared" semantics are broken.
+        assertTrue(!c1Got.isEmpty() && !c2Got.isEmpty() && !c3Got.isEmpty(),
+                "each consumer must get at least one message"
+                        + " (c1=" + c1Got.size() + " c2=" + c2Got.size() + " 
c3=" + c3Got.size() + ")");
+    }
+
+    private Thread drainTo(QueueConsumer<String> consumer, Set<String> all, 
Set<String> mine) {
+        Thread t = new Thread(() -> {
+            try {
+                while (true) {
+                    Message<String> msg = 
consumer.receive(Duration.ofMillis(500));
+                    if (msg == null) {
+                        return;
+                    }
+                    all.add(msg.value());
+                    mine.add(msg.value());
+                    consumer.acknowledge(msg.id());
+                }
+            } catch (Exception ignored) {
+            }
+        }, "queue-consumer-drainer");
+        t.start();
+        return t;
+    }
+
+    @Test
+    public void testCheckpointConsumersEachSeeFullStream() throws Exception {
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        int n = 50;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // CheckpointConsumer uses Readers: there's no broker-side 
subscription cursor,
+        // so multiple consumers don't load-balance — each independently reads 
the full
+        // stream from the requested start position.
+        @Cleanup
+        CheckpointConsumer<String> a = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+        @Cleanup
+        CheckpointConsumer<String> b = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        Set<String> aGot = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = a.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "consumer A missed message #" + i);
+            aGot.add(msg.value());
+        }
+        Set<String> bGot = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = b.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "consumer B missed message #" + i);
+            bGot.add(msg.value());
+        }
+        assertEquals(aGot, sent, "consumer A must see every produced message");
+        assertEquals(bGot, sent, "consumer B must see every produced message");
+    }
+
+    @Test
+    public void testStreamConsumersSplitSegmentsAcrossConsumers() throws 
Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "stream-split-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Two stream consumers on the same subscription. The broker's
+        // SubscriptionCoordinator rebalances on the second attach, splitting 
the four
+        // segments across the two consumers — each ends up with a disjoint 
subset.
+        @Cleanup
+        StreamConsumer<String> a = v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+        @Cleanup
+        StreamConsumer<String> b = v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 100;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Drain in parallel: each consumer pulls until idle.
+        Set<String> received = ConcurrentHashMap.newKeySet();
+        Set<String> aGot = ConcurrentHashMap.newKeySet();
+        Set<String> bGot = ConcurrentHashMap.newKeySet();
+        Thread ta = drainStreamTo(a, received, aGot);
+        Thread tb = drainStreamTo(b, received, bGot);
+        ta.join();
+        tb.join();
+
+        assertEquals(received, sent, "every message must be delivered exactly 
once across consumers");
+
+        Set<String> overlap = new HashSet<>(aGot);
+        overlap.retainAll(bGot);
+        assertTrue(overlap.isEmpty(),
+                "no message should be delivered to both stream consumers, 
overlap=" + overlap);
+
+        assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+                "controller must split segments across both consumers"
+                        + " (a=" + aGot.size() + " b=" + bGot.size() + ")");
+    }
+
+    private Thread drainStreamTo(StreamConsumer<String> consumer, Set<String> 
all, Set<String> mine) {
+        Thread t = new Thread(() -> {
+            try {
+                MessageId last = null;
+                while (true) {
+                    Message<String> msg = 
consumer.receive(Duration.ofSeconds(1));
+                    if (msg == null) {
+                        if (last != null) {
+                            consumer.acknowledgeCumulative(last);
+                        }
+                        return;
+                    }
+                    all.add(msg.value());
+                    mine.add(msg.value());
+                    last = msg.id();
+                }
+            } catch (Exception ignored) {
+            }
+        }, "stream-consumer-drainer");
+        t.start();
+        return t;
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
new file mode 100644
index 00000000000..cceaa3b3edf
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableConsumerClient.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import io.github.merlimat.slog.Logger;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.ClientCnx;
+import org.apache.pulsar.client.impl.PulsarClientImpl;
+import org.apache.pulsar.client.impl.ScalableConsumerSession;
+import org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
+import org.apache.pulsar.common.api.proto.ScalableAssignedSegment;
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.protocol.Commands;
+import org.apache.pulsar.common.scalable.HashRange;
+
+/**
+ * Client-side session for a scalable-topic consumer (Stream or Checkpoint).
+ *
+ * <p>Sends a {@code CommandScalableTopicSubscribe} on attach, awaits the 
initial
+ * {@link ScalableConsumerAssignment}, and then accepts pushed updates via
+ * {@link #onAssignmentUpdate}. The current assignment is exposed as a list of
+ * {@link ActiveSegment} entries (segmentId, hashRange, segmentTopic) so the 
consumer
+ * implementations can use the same segment-attach plumbing they already have.
+ *
+ * <p>Mirrors {@link DagWatchClient} in shape; the wire path is different — 
subscribe
+ * is request/response (matched by request id) and updates are tagged with the
+ * client-chosen consumer id.
+ */
+final class ScalableConsumerClient implements ScalableConsumerSession, 
AutoCloseable {
+
+    private static final Logger LOG = Logger.get(ScalableConsumerClient.class);
+    private final Logger log;
+
+    private final PulsarClientImpl v4Client;
+    private final TopicName topicName;
+    private final String subscription;
+    private final String consumerName;
+    private final long consumerId;
+    private final ScalableConsumerType consumerType;
+
+    private final AtomicReference<List<ActiveSegment>> currentAssignment =
+            new AtomicReference<>(List.of());
+    private volatile long currentEpoch = -1L;
+    private final CompletableFuture<List<ActiveSegment>> 
initialAssignmentFuture =
+            new CompletableFuture<>();
+    private volatile AssignmentChangeListener listener;
+    private volatile ClientCnx cnx;
+    private volatile boolean closed = false;
+
+    ScalableConsumerClient(PulsarClientImpl v4Client,
+                           TopicName topicName,
+                           String subscription,
+                           String consumerName,
+                           ScalableConsumerType consumerType) {
+        this.v4Client = v4Client;
+        this.topicName = topicName;
+        this.subscription = subscription;
+        this.consumerName = consumerName;
+        this.consumerId = v4Client.newConsumerId();
+        this.consumerType = consumerType;
+        this.log = LOG.with()
+                .attr("topic", topicName)
+                .attr("subscription", subscription)
+                .attr("consumerName", consumerName)
+                .attr("consumerId", consumerId)
+                .build();
+    }
+
+    /**
+     * Resolve the controller-leader broker URL via a DAG-watch lookup, open a
+     * connection to it directly (no v4 topic lookup — scalable topic URIs 
aren't
+     * resolvable through the v4 lookup service), then send the subscribe 
command and
+     * complete with the initial assignment.
+     */
+    CompletableFuture<List<ActiveSegment>> start() {
+        DagWatchClient watch = new DagWatchClient(v4Client, topicName);
+        watch.start()
+                .thenCompose(layout -> {
+                    String controllerUrl = layout.controllerBrokerUrl();
+                    if (controllerUrl == null || controllerUrl.isEmpty()) {
+                        // Controller leader election hasn't completed yet (or 
the broker
+                        // doesn't advertise the URL). Fall back to the 
configured
+                        // service URL — any broker will forward the subscribe 
request
+                        // to the controller via getOrCreateController.
+                        log.info()
+                                .log("Layout has no controller URL; connecting 
to service URL");
+                        return v4Client.getConnectionToServiceUrl();
+                    }
+                    URI uri = URI.create(controllerUrl);
+                    InetSocketAddress addr = 
InetSocketAddress.createUnresolved(
+                            uri.getHost(), uri.getPort());
+                    return v4Client.getConnection(addr, addr,
+                            v4Client.getCnxPool().genRandomKeyToSelectCon());
+                })
+                .whenComplete((cnx, ex) -> {
+                    // Close the watch session as soon as we have the 
controller URL —
+                    // the controller pushes assignment updates directly, so 
we don't
+                    // need a long-lived layout watch on this consumer.
+                    watch.close();
+                })
+                .thenAccept(cnx -> {
+                    this.cnx = cnx;
+                    cnx.registerScalableConsumerSession(consumerId, this);
+
+                    long requestId = v4Client.newRequestId();
+                    var responseFuture = new 
TimedCompletableFuture<ScalableConsumerAssignment>();
+                    cnx.getPendingRequests().put(requestId, responseFuture);
+
+                    cnx.ctx().writeAndFlush(Commands.newScalableTopicSubscribe(
+                                    requestId,
+                                    topicName.toString(),
+                                    subscription,
+                                    consumerName,
+                                    consumerId,
+                                    consumerType))
+                            .addListener(writeFuture -> {
+                                if (!writeFuture.isSuccess()) {
+                                    cnx.getPendingRequests().remove(requestId);
+                                    
cnx.removeScalableConsumerSession(consumerId);
+                                    
initialAssignmentFuture.completeExceptionally(
+                                            new 
PulsarClientException(writeFuture.cause()));
+                                }
+                            });
+
+                    responseFuture.whenComplete((assignment, ex) -> {
+                        if (ex != null) {
+                            initialAssignmentFuture.completeExceptionally(ex);
+                            return;
+                        }
+                        List<ActiveSegment> segments = 
toSegmentList(assignment);
+                        currentAssignment.set(segments);
+                        currentEpoch = assignment.getLayoutEpoch();
+                        log.info().attr("epoch", currentEpoch)
+                                .attr("segments", segments.size())
+                                .log("Initial assignment received");
+                        initialAssignmentFuture.complete(segments);
+                    });
+                })
+                .exceptionally(ex -> {
+                    initialAssignmentFuture.completeExceptionally(ex);
+                    return null;
+                });
+        return initialAssignmentFuture;
+    }
+
+    @Override
+    public void onAssignmentUpdate(ScalableConsumerAssignment assignment) {
+        if (closed) {
+            return;
+        }
+        long epoch = assignment.getLayoutEpoch();
+        if (epoch < currentEpoch) {
+            log.info().attr("staleEpoch", epoch).attr("currentEpoch", 
currentEpoch)
+                    .log("Ignoring stale assignment update");
+            return;
+        }
+        List<ActiveSegment> newSegments = toSegmentList(assignment);
+        List<ActiveSegment> oldSegments = 
currentAssignment.getAndSet(newSegments);
+        currentEpoch = epoch;
+        log.info().attr("epoch", epoch).attr("segments", newSegments.size())
+                .log("Assignment updated");
+
+        AssignmentChangeListener l = listener;
+        if (l != null) {
+            try {
+                l.onAssignmentChange(newSegments, oldSegments);
+            } catch (Exception e) {
+                log.error().exception(e).log("Error in assignment change 
listener");
+            }
+        }
+    }
+
+    @Override
+    public void connectionClosed() {
+        log.warn("Scalable consumer session connection closed");
+        cnx = null;
+        if (!initialAssignmentFuture.isDone()) {
+            initialAssignmentFuture.completeExceptionally(
+                    new PulsarClientException("Connection closed before 
initial assignment arrived"));
+        }
+        // TODO: implement automatic re-subscribe on reconnect
+    }
+
+    private static List<ActiveSegment> 
toSegmentList(ScalableConsumerAssignment assignment) {
+        List<ActiveSegment> segments = new 
ArrayList<>(assignment.getSegmentsCount());
+        for (int i = 0; i < assignment.getSegmentsCount(); i++) {
+            ScalableAssignedSegment s = assignment.getSegmentAt(i);
+            segments.add(new ActiveSegment(
+                    s.getSegmentId(),
+                    HashRange.of((int) s.getHashStart(), (int) s.getHashEnd()),
+                    s.getSegmentTopic()));
+        }
+        return Collections.unmodifiableList(segments);
+    }
+
+    List<ActiveSegment> currentAssignment() {
+        return currentAssignment.get();
+    }
+
+    void setListener(AssignmentChangeListener listener) {
+        this.listener = listener;
+    }
+
+    long consumerId() {
+        return consumerId;
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+        closed = true;
+        ClientCnx c = cnx;
+        if (c != null) {
+            c.removeScalableConsumerSession(consumerId);
+            // No close command for now — broker reaps registrations via grace 
timer on
+            // disconnect. A future refactor can add an explicit unsubscribe.
+        }
+    }
+
+    interface AssignmentChangeListener {
+        void onAssignmentChange(List<ActiveSegment> newSegments, 
List<ActiveSegment> oldSegments);
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 31844f069bb..e33191c2861 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -56,7 +56,8 @@ import 
org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
  * vector. This ensures that acknowledging a single message correctly advances
  * all segments, not just the one it came from.
  */
-final class ScalableStreamConsumer<T> implements StreamConsumer<T>, 
DagWatchClient.LayoutChangeListener {
+final class ScalableStreamConsumer<T>
+        implements StreamConsumer<T>, 
ScalableConsumerClient.AssignmentChangeListener {
 
     private static final Logger LOG = Logger.get(ScalableStreamConsumer.class);
     private final Logger log;
@@ -65,7 +66,7 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
     private final Schema<T> v5Schema;
     private final org.apache.pulsar.client.api.Schema<T> v4Schema;
     private final ConsumerConfigurationData<T> consumerConf;
-    private final DagWatchClient dagWatch;
+    private final ScalableConsumerClient session;
     private final String topicName;
     private final String subscriptionName;
 
@@ -92,33 +93,36 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
     private ScalableStreamConsumer(PulsarClientV5 client,
                                    Schema<T> v5Schema,
                                    ConsumerConfigurationData<T> consumerConf,
-                                   DagWatchClient dagWatch) {
+                                   ScalableConsumerClient session,
+                                   String topicName) {
         this.client = client;
         this.v5Schema = v5Schema;
         this.v4Schema = SchemaAdapter.toV4(v5Schema);
         this.consumerConf = consumerConf;
-        this.dagWatch = dagWatch;
-        this.topicName = dagWatch.topicName().toString();
+        this.session = session;
+        this.topicName = topicName;
         this.subscriptionName = consumerConf.getSubscriptionName();
         this.log = LOG.with().attr("topic", topicName).attr("subscription", 
subscriptionName).build();
         this.asyncView = new AsyncStreamConsumerV5<>(this);
     }
 
     /**
-     * Create a fully initialized consumer asynchronously. The returned future 
completes
-     * only after every initial segment has been successfully subscribed. If 
any segment
-     * fails to subscribe, all already-subscribed segments are closed and the 
future
-     * completes exceptionally.
+     * Create a fully initialized consumer asynchronously. The session has 
already
+     * registered with the controller and the {@code initialAssignment} list 
contains
+     * the segments this consumer should attach to. The returned future 
completes only
+     * after every assigned segment has been successfully subscribed.
      */
     static <T> CompletableFuture<StreamConsumer<T>> createAsync(PulsarClientV5 
client,
                                                                 Schema<T> 
v5Schema,
                                                                 
ConsumerConfigurationData<T> consumerConf,
-                                                                DagWatchClient 
dagWatch,
-                                                                
ClientSegmentLayout initialLayout) {
-        ScalableStreamConsumer<T> consumer = new 
ScalableStreamConsumer<>(client, v5Schema, consumerConf, dagWatch);
-        return consumer.subscribeSegments(initialLayout)
+                                                                
ScalableConsumerClient session,
+                                                                String 
topicName,
+                                                                
List<ActiveSegment> initialAssignment) {
+        ScalableStreamConsumer<T> consumer = new ScalableStreamConsumer<>(
+                client, v5Schema, consumerConf, session, topicName);
+        return consumer.subscribeAssigned(initialAssignment)
                 .thenApply(__ -> {
-                    dagWatch.setListener(consumer);
+                    session.setListener(consumer);
                     return (StreamConsumer<T>) consumer;
                 })
                 .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, 
___) -> {
@@ -249,7 +253,7 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
 
     CompletableFuture<Void> closeAsync() {
         closed = true;
-        dagWatch.close();
+        session.close();
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         for (var future : segmentConsumers.values()) {
@@ -262,37 +266,49 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
                 .whenComplete((__, ___) -> segmentConsumers.clear());
     }
 
-    // --- Layout change handling ---
+    // --- Assignment change handling ---
 
     @Override
-    public void onLayoutChange(ClientSegmentLayout newLayout, 
ClientSegmentLayout oldLayout) {
+    public void onAssignmentChange(List<ActiveSegment> newSegments, 
List<ActiveSegment> oldSegments) {
         // Fully async: safe to run on the netty IO thread that delivered the 
update.
-        subscribeSegments(newLayout).exceptionally(ex -> {
-            log.warn().exceptionMessage(ex).log("Failed to apply layout 
update");
+        subscribeAssigned(newSegments).exceptionally(ex -> {
+            log.warn().exceptionMessage(ex).log("Failed to apply assignment 
update");
             return null;
         });
     }
 
-    private CompletableFuture<Void> subscribeSegments(ClientSegmentLayout 
layout) {
-        // The stream consumer is controller-driven: it only subscribes to the 
segments
-        // the controller currently designates as active. When a segment moves 
out of the
-        // active set (because of a split or merge), we leave its v4 consumer 
alive so
-        // pending acks for messages already pulled from it can still be 
forwarded; the
-        // receive loop closes the consumer naturally once it sees 
TopicTerminated.
-        var activeIds = ConcurrentHashMap.<Long>newKeySet();
-        for (var seg : layout.activeSegments()) {
-            activeIds.add(seg.segmentId());
+    private CompletableFuture<Void> subscribeAssigned(List<ActiveSegment> 
assigned) {
+        // Controller-driven assignment: the broker's SubscriptionCoordinator 
decides
+        // which segments this consumer owns at any moment. We subscribe to 
exactly
+        // those, regardless of whether the controller picked them from the 
active or
+        // sealed set — to the v4 layer they're just per-segment Exclusive 
subscriptions.
+        var assignedIds = ConcurrentHashMap.<Long>newKeySet();
+        for (var seg : assigned) {
+            assignedIds.add(seg.segmentId());
         }
 
-        // Subscribe to newly-active segments asynchronously.
+        // Segments that fell out of our assignment (rebalanced away to another
+        // consumer): close our v4 consumer so the Exclusive lock is released 
and
+        // the new owner can attach. Sealed-and-drained segments take a 
different
+        // path: the receive loop closes them on TopicTerminated.
+        for (var entry : segmentConsumers.entrySet()) {
+            if (!assignedIds.contains(entry.getKey())) {
+                log.info().attr("segmentId", entry.getKey())
+                        .log("Closing consumer for segment removed from 
assignment");
+                entry.getValue().thenAccept(c -> c.closeAsync());
+                segmentConsumers.remove(entry.getKey());
+                latestDelivered.remove(entry.getKey());
+            }
+        }
+
+        // Subscribe to newly-assigned segments.
         List<CompletableFuture<?>> futures = new ArrayList<>();
-        for (var seg : layout.activeSegments()) {
+        for (var seg : assigned) {
             futures.add(segmentConsumers.computeIfAbsent(seg.segmentId(),
                     id -> createSegmentConsumerAsync(seg)));
         }
 
-        log.info().attr("epoch", layout.epoch())
-                .attr("segments", activeIds).log("Stream consumer layout 
applied");
+        log.info().attr("segments", assignedIds).log("Stream consumer 
assignment applied");
         return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index d1ab3109387..f8ae92b014a 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -31,6 +31,7 @@ import 
org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
 import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.v5.schema.Schema;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicName;
 
 /**
@@ -72,11 +73,23 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
         }
 
         TopicName topic = V5Utils.asScalableTopicName(topicName);
-        DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
-
-        return dagWatch.start()
-                .thenCompose(initialLayout -> 
ScalableStreamConsumer.createAsync(
-                        client, v5Schema, conf, dagWatch, initialLayout));
+        // Default the consumer name to a stable random when the user didn't 
set one —
+        // ScalableConsumerClient uses it as the registration key with the 
controller.
+        // Use a full UUID (~122 bits of entropy) so the registration key is 
unique in
+        // practice, even across many concurrent attaches.
+        if (conf.getConsumerName() == null || 
conf.getConsumerName().isEmpty()) {
+            conf.setConsumerName("v5-stream-" + java.util.UUID.randomUUID());
+        }
+        ScalableConsumerClient session = new ScalableConsumerClient(
+                client.v4Client(),
+                topic,
+                conf.getSubscriptionName(),
+                conf.getConsumerName(),
+                ScalableConsumerType.STREAM);
+
+        return session.start()
+                .thenCompose(initialAssignment -> 
ScalableStreamConsumer.createAsync(
+                        client, v5Schema, conf, session, topic.toString(), 
initialAssignment));
     }
 
     @Override
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7ab0b4f0239..75131f0af1d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -167,6 +167,17 @@ public class ClientCnx extends PulsarHandler {
                     .concurrencyLevel(1)
                     .build();
 
+    /**
+     * Per-consumer scalable subscribe sessions, keyed by the {@code 
consumerId} the V5
+     * client assigned at subscribe time. The broker tags every
+     * {@link CommandScalableTopicAssignmentUpdate} with this id.
+     */
+    private final ConcurrentLongHashMap<ScalableConsumerSession> 
scalableConsumerSessions =
+            ConcurrentLongHashMap.<ScalableConsumerSession>newBuilder()
+                    .expectedItems(4)
+                    .concurrencyLevel(1)
+                    .build();
+
     private final CompletableFuture<Void> connectionFuture = new 
CompletableFuture<Void>();
     private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new 
ConcurrentLinkedQueue<>();
 
@@ -361,6 +372,7 @@ public class ClientCnx extends PulsarHandler {
         transactionMetaStoreHandlers.forEach((id, handler) -> 
handler.connectionClosed(this));
         topicListWatchers.forEach((__, watcher) -> 
watcher.connectionClosed(this));
         dagWatchSessions.forEach((__, session) -> session.connectionClosed());
+        scalableConsumerSessions.forEach((__, session) -> 
session.connectionClosed());
 
         waitingLookupRequests.clear();
 
@@ -368,6 +380,7 @@ public class ClientCnx extends PulsarHandler {
         consumers.clear();
         topicListWatchers.clear();
         dagWatchSessions.clear();
+        scalableConsumerSessions.clear();
 
         timeoutTask.cancel(true);
     }
@@ -1351,6 +1364,65 @@ public class ClientCnx extends PulsarHandler {
         dagWatchSessions.remove(sessionId);
     }
 
+    @Override
+    protected void handleCommandScalableTopicSubscribeResponse(
+            
org.apache.pulsar.common.api.proto.CommandScalableTopicSubscribeResponse cmd) {
+        checkArgument(state == State.Ready);
+
+        long requestId = cmd.getRequestId();
+        log.debug().attr("requestId", requestId).log("Received 
scalableTopicSubscribeResponse");
+
+        if (cmd.hasError()) {
+            CompletableFuture<? extends Object> requestFuture = 
pendingRequests.remove(requestId);
+            if (requestFuture != null && !requestFuture.isDone()) {
+                requestFuture.completeExceptionally(new PulsarClientException(
+                        "Scalable topic subscribe failed: " + cmd.getError()
+                                + (cmd.hasMessage() ? " - " + cmd.getMessage() 
: "")));
+            }
+            return;
+        }
+
+        @SuppressWarnings("unchecked")
+        
TimedCompletableFuture<org.apache.pulsar.common.api.proto.ScalableConsumerAssignment>
+                requestFuture = (TimedCompletableFuture<
+                
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment>) 
pendingRequests.remove(requestId);
+        if (requestFuture == null || requestFuture.isDone()) {
+            log.warn().attr("requestId", requestId)
+                    .log("Received scalable topic subscribe response for 
unknown / completed request");
+            return;
+        }
+        // Defensive copy: the proto payload's backing buffer is reused by the 
decoder
+        // once we return. Build a snapshot the future's downstream consumers 
can hold.
+        var assignment = new 
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment();
+        assignment.copyFrom(cmd.getAssignment());
+        requestFuture.complete(assignment);
+    }
+
+    @Override
+    protected void handleCommandScalableTopicAssignmentUpdate(
+            
org.apache.pulsar.common.api.proto.CommandScalableTopicAssignmentUpdate cmd) {
+        checkArgument(state == State.Ready);
+
+        long consumerId = cmd.getConsumerId();
+        ScalableConsumerSession session = 
scalableConsumerSessions.get(consumerId);
+        if (session == null) {
+            log.warn().attr("consumerId", consumerId)
+                    .log("Received scalable topic assignment update for 
unknown consumer");
+            return;
+        }
+        var assignment = new 
org.apache.pulsar.common.api.proto.ScalableConsumerAssignment();
+        assignment.copyFrom(cmd.getAssignment());
+        session.onAssignmentUpdate(assignment);
+    }
+
+    public void registerScalableConsumerSession(long consumerId, 
ScalableConsumerSession session) {
+        scalableConsumerSessions.put(consumerId, session);
+    }
+
+    public void removeScalableConsumerSession(long consumerId) {
+        scalableConsumerSessions.remove(consumerId);
+    }
+
     /**
      * check serverError and take appropriate action.
      * <ul>
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index baf6198420e..a8c7b46b3db 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -1309,7 +1309,7 @@ public class PulsarClientImpl implements PulsarClient {
         return producerIdGenerator.getAndIncrement();
     }
 
-    long newConsumerId() {
+    public long newConsumerId() {
         return consumerIdGenerator.getAndIncrement();
     }
 
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableConsumerSession.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableConsumerSession.java
new file mode 100644
index 00000000000..ff42253a547
--- /dev/null
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ScalableConsumerSession.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+
+/**
+ * Callback interface for a registered scalable-topic consumer. Implemented by 
the V5
+ * client's per-consumer session. The broker pushes {@link 
CommandScalableTopicAssignmentUpdate}
+ * messages tagged with the consumer's {@code consumerId}; {@link ClientCnx} 
dispatches
+ * those to the matching session.
+ */
+public interface ScalableConsumerSession {
+
+    /**
+     * Called when the broker pushes a new segment assignment to this consumer 
(after a
+     * peer joined/left the subscription or the topic layout changed).
+     */
+    void onAssignmentUpdate(ScalableConsumerAssignment assignment);
+
+    /**
+     * Called when the connection to the broker is closed. Implementations 
should mark
+     * the session as needing reconnection and re-register on the next 
available
+     * connection.
+     */
+    void connectionClosed();
+}
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index c805164ef92..227aae5ee17 100644
--- 
a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ 
b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -106,6 +106,7 @@ import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.api.proto.MessageMetadata;
 import org.apache.pulsar.common.api.proto.ProtocolVersion;
 import org.apache.pulsar.common.api.proto.ScalableConsumerAssignment;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.api.proto.ScalableTopicDAG;
 import org.apache.pulsar.common.api.proto.Schema;
 import org.apache.pulsar.common.api.proto.ServerError;
@@ -1709,6 +1710,26 @@ public class Commands {
         return serializeWithSize(cmd);
     }
 
+    /**
+     * Client -> Broker: register as a scalable consumer (Stream or 
Checkpoint) and
+     * request the initial segment assignment. The broker leader persists the
+     * registration and replies with a {@link 
CommandScalableTopicSubscribeResponse}.
+     */
+    public static ByteBuf newScalableTopicSubscribe(long requestId, String 
topic,
+                                                     String subscription, 
String consumerName,
+                                                     long consumerId,
+                                                     ScalableConsumerType 
consumerType) {
+        BaseCommand cmd = localCmd(Type.SCALABLE_TOPIC_SUBSCRIBE);
+        cmd.setScalableTopicSubscribe()
+                .setRequestId(requestId)
+                .setTopic(topic)
+                .setSubscription(subscription)
+                .setConsumerName(consumerName)
+                .setConsumerId(consumerId)
+                .setConsumerType(consumerType);
+        return serializeWithSize(cmd);
+    }
+
     /**
      * Broker -> Client: response to a scalable-topic subscribe request. On 
success the
      * caller must populate the nested {@link ScalableConsumerAssignment} via

Reply via email to