This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 145b3a55d0f [fix][client] PIP-468: V5 CheckpointConsumer 
consumer-group support (#25622)
145b3a55d0f is described below

commit 145b3a55d0fe04ade57c18f3a0ce15c87239d752
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 07:39:46 2026 -0700

    [fix][client] PIP-468: V5 CheckpointConsumer consumer-group support (#25622)
---
 .../api/v5/V5CheckpointConsumerGroupTest.java      | 491 +++++++++++++++++++++
 .../client/api/v5/CheckpointConsumerBuilder.java   |  23 +
 .../impl/v5/CheckpointConsumerBuilderV5.java       |  29 +-
 .../client/impl/v5/ScalableCheckpointConsumer.java | 162 +++++--
 .../client/impl/v5/StreamConsumerBuilderV5.java    |   4 +-
 .../apache/pulsar/client/impl/v5/V5RandomIds.java  |  50 +++
 6 files changed, 710 insertions(+), 49 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerGroupTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerGroupTest.java
new file mode 100644
index 00000000000..80673085f83
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerGroupTest.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link CheckpointConsumerBuilder#consumerGroup(String)} — 
multiple
+ * checkpoint consumers in the same group share the topic's segments via the 
broker's
+ * subscription coordinator, identical to the StreamConsumer pattern but with
+ * client-side position state instead of broker-side cursors.
+ *
+ * <p>Two key invariants we assert here that don't apply to StreamConsumer:
+ * <ul>
+ *   <li><b>Position state is client-side.</b> A checkpoint consumer resumes 
from
+ *       whatever {@link Checkpoint} the application passes at create time. 
The broker
+ *       does not track per-consumer cursors.</li>
+ *   <li><b>No leftover metadata.</b> Once every member of a group goes away 
and the
+ *       grace timer has fired, the broker's stats report no subscription for 
that
+ *       group on the topic.</li>
+ * </ul>
+ */
+public class V5CheckpointConsumerGroupTest extends V5ClientBaseTest {
+
+    @Test
+    public void testConsumersInGroupShareSegments() throws Exception {
+        String topic = newScalableTopic(4);
+        String group = "group-share";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Two checkpoint consumers in the same group: each should end up with 
a
+        // disjoint subset of segments via the controller's assignment.
+        @Cleanup
+        CheckpointConsumer<String> a = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+        @Cleanup
+        CheckpointConsumer<String> b = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int n = 100;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        Set<String> received = ConcurrentHashMap.newKeySet();
+        Set<String> aGot = ConcurrentHashMap.newKeySet();
+        Set<String> bGot = ConcurrentHashMap.newKeySet();
+        Thread t1 = drainTo(a, received, aGot);
+        Thread t2 = drainTo(b, received, bGot);
+        t1.join();
+        t2.join();
+
+        assertEquals(received, sent, "every message must be delivered exactly 
once across the group");
+
+        Set<String> overlap = new HashSet<>(aGot);
+        overlap.retainAll(bGot);
+        assertTrue(overlap.isEmpty(), "no message should be delivered to both 
consumers, overlap=" + overlap);
+
+        assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+                "controller must split segments across both consumers"
+                        + " (a=" + aGot.size() + " b=" + bGot.size() + ")");
+    }
+
+    @Test
+    public void testGroupScalesUp() throws Exception {
+        String topic = newScalableTopic(2);
+        String group = "group-scale";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Phase 1: a single consumer in the group serves both segments.
+        CheckpointConsumer<String> c1 = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int phaseN = 30;
+        produceBatch(producer, "p1-", phaseN);
+        Set<String> p1Received = drainExpected(c1, phaseN);
+        assertEquals(p1Received.size(), phaseN, "single consumer should drain 
its phase exactly");
+
+        // Phase 2: add a second consumer. CheckpointConsumer has no 
broker-side cursor,
+        // so every consumer starts from whatever Checkpoint it provides. Use 
latest so
+        // c2 only reads messages produced after it joins (avoids re-reading 
phase 1
+        // from the segments it's assigned).
+        @Cleanup
+        CheckpointConsumer<String> c2 = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.latest())
+                .create();
+
+        produceBatch(producer, "p2-", phaseN);
+        Set<String> c1Phase2 = ConcurrentHashMap.newKeySet();
+        Set<String> c2Phase2 = ConcurrentHashMap.newKeySet();
+        Set<String> p2Received = ConcurrentHashMap.newKeySet();
+        Thread d1 = drainTo(c1, p2Received, c1Phase2);
+        Thread d2 = drainTo(c2, p2Received, c2Phase2);
+        d1.join();
+        d2.join();
+        assertEquals(p2Received.size(), phaseN, "phase 2 must be fully 
delivered across both consumers");
+        assertTrue(!c1Phase2.isEmpty() && !c2Phase2.isEmpty(),
+                "phase 2 must split across both consumers: c1=" + 
c1Phase2.size()
+                        + " c2=" + c2Phase2.size());
+
+        c1.close();
+    }
+
+    @Test
+    public void testResumesFromClientProvidedCheckpoint() throws Exception {
+        // Single segment so per-iteration value assertions are deterministic 
— order
+        // is preserved within a segment.
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Use the unmanaged (no-group) checkpoint consumer here: this test 
isolates
+        // the "position state lives entirely in the client checkpoint" 
property.
+        // Adding a consumer group would mix in the broker's session-tracking 
— that's
+        // covered by the other tests in this class.
+        CheckpointConsumer<String> first = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int prefix = 30;
+        for (int i = 0; i < prefix; i++) {
+            producer.newMessage().key("k-" + i).value("v-" + i).send();
+        }
+        for (int i = 0; i < prefix; i++) {
+            Message<String> msg = first.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed prefix message #" + i);
+            assertEquals(msg.value(), "v-" + i, "prefix message out of order");
+            assertEquals(msg.key().orElse(null), "k-" + i, "prefix key out of 
order");
+        }
+        Checkpoint mid = first.checkpoint();
+
+        // Produce more data, then close the consumer entirely.
+        int suffix = 30;
+        for (int i = 0; i < suffix; i++) {
+            producer.newMessage().key("k-" + (prefix + i)).value("v-" + 
(prefix + i)).send();
+        }
+        first.close();
+
+        // A fresh consumer that hands the saved checkpoint as its start 
position must
+        // resume from the message immediately after the one that produced the
+        // checkpoint — no broker state is involved.
+        @Cleanup
+        CheckpointConsumer<String> second = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(mid)
+                .create();
+
+        for (int i = 0; i < suffix; i++) {
+            Message<String> msg = second.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed resumed message #" + i);
+            assertEquals(msg.value(), "v-" + (prefix + i),
+                    "resume from checkpoint delivered the wrong value at index 
" + i);
+            assertEquals(msg.key().orElse(null), "k-" + (prefix + i),
+                    "resume from checkpoint delivered the wrong key at index " 
+ i);
+        }
+        assertNull(second.receive(Duration.ofMillis(200)),
+                "no extra messages should arrive past the produced suffix");
+    }
+
+    @Test
+    public void testNoMetadataLeftAfterAllGroupMembersClose() throws Exception 
{
+        String topic = newScalableTopic(2);
+        String group = "group-cleanup";
+
+        // Open two members of the group on dedicated clients. Closing the 
client
+        // severs the underlying connection, which is what the broker observes 
as a
+        // disconnect — only then does the grace timer arm and eventually 
evict the
+        // session. (consumer.close() alone removes only the local 
registration.)
+        PulsarClient cli1 = newV5Client();
+        cli1.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+        PulsarClient cli2 = newV5Client();
+        cli2.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .consumerGroup(group)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        cli1.close();
+        cli2.close();
+
+        // Once the broker grace timer fires (default 60s; SharedPulsarCluster 
will
+        // lower this once PR #25619 lands), the controller evicts the 
sessions and
+        // deletes the persisted registrations. After that, the topic stats 
must report
+        // zero consumers in the group.
+        Awaitility.await().atMost(Duration.ofSeconds(90)).untilAsserted(() -> {
+            var stats = admin.scalableTopics().getStats(topic);
+            var sub = stats.getSubscriptions().get(group);
+            assertTrue(sub == null || sub.consumerCount() == 0,
+                    "group '" + group + "' must leave no consumers behind, got 
"
+                            + (sub == null ? "null" : sub.consumerCount() + " 
consumers"));
+        });
+    }
+
+    // --- Helpers ---
+
+    private void produceBatch(Producer<String> producer, String prefix, int n) 
throws Exception {
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().key("k-" + i).value(prefix + i).send();
+        }
+    }
+
+    private Set<String> drainExpected(CheckpointConsumer<String> consumer, int 
expected) throws Exception {
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+            }
+        }
+        return received;
+    }
+
+    private Thread drainTo(CheckpointConsumer<String> consumer, Set<String> 
all, Set<String> mine) {
+        Thread t = new Thread(() -> {
+            try {
+                while (true) {
+                    Message<String> msg = 
consumer.receive(Duration.ofSeconds(1));
+                    if (msg == null) {
+                        return;
+                    }
+                    all.add(msg.value());
+                    mine.add(msg.value());
+                }
+            } catch (Exception ignored) {
+            }
+        }, "checkpoint-consumer-drainer");
+        t.start();
+        return t;
+    }
+
+    @Test
+    public void testReceiveMultiReturnsBatch() throws Exception {
+        // Single segment so messages stay in send order.
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int n = 20;
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().key("k-" + i).value("v-" + i).send();
+        }
+
+        // receiveMulti returns up to maxNumMessages within the timeout, 
ordered as
+        // they arrived from the segment. Drain the full batch in chunks of 8.
+        Set<String> received = new HashSet<>();
+        long deadline = System.currentTimeMillis() + 10_000L;
+        while (received.size() < n && System.currentTimeMillis() < deadline) {
+            Messages<String> batch = consumer.receiveMulti(8, 
Duration.ofSeconds(1));
+            assertNotNull(batch);
+            for (Message<String> msg : batch) {
+                received.add(msg.value());
+            }
+        }
+        assertEquals(received.size(), n,
+                "receiveMulti must surface every produced message exactly 
once");
+    }
+
+    @Test
+    public void testCheckpointResumeAcrossSplit() throws Exception {
+        // Single initial segment so the checkpoint position is unambiguous.
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Read the prefix and take a mid-stream checkpoint (still on the 
original
+        // single segment).
+        CheckpointConsumer<String> first = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int prefix = 20;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < prefix; i++) {
+            String v = "pre-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+        Set<String> consumed = new HashSet<>();
+        for (int i = 0; i < prefix / 2; i++) {
+            Message<String> msg = first.receive(Duration.ofSeconds(5));
+            assertNotNull(msg);
+            consumed.add(msg.value());
+        }
+        Checkpoint mid = first.checkpoint();
+        first.close();
+
+        // Split the original segment, then produce more messages — these 
route to the
+        // new active children.
+        long parentId = -1;
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                parentId = seg.getSegmentId();
+                break;
+            }
+        }
+        assertTrue(parentId >= 0);
+        admin.scalableTopics().splitSegment(topic, parentId);
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 2, "split must produce 2 active children");
+        });
+
+        int suffix = 20;
+        for (int i = 0; i < suffix; i++) {
+            String v = "post-" + i;
+            producer.newMessage().key("k-suffix-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Resume from the saved checkpoint. The unmanaged consumer subscribes 
to
+        // active + sealed segments, so the saved position on the now-sealed 
parent
+        // still drains its remainder, and the children deliver the post-split 
data.
+        @Cleanup
+        CheckpointConsumer<String> resumed = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(mid)
+                .create();
+
+        Set<String> received = new HashSet<>();
+        // Expected: prefix/2 unread parent messages + all suffix messages.
+        int expected = (prefix - prefix / 2) + suffix;
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = resumed.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+            }
+        }
+        Set<String> expectedSet = new HashSet<>(sent);
+        expectedSet.removeAll(consumed);
+        assertEquals(received, expectedSet,
+                "resume across split must replay the parent tail and every 
child message");
+    }
+
+    @Test
+    public void testCheckpointResumeAcrossMerge() throws Exception {
+        // Two initial segments so we can merge them.
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        CheckpointConsumer<String> first = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        int prefix = 20;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < prefix; i++) {
+            String v = "pre-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+        Set<String> consumed = new HashSet<>();
+        for (int i = 0; i < prefix / 2; i++) {
+            Message<String> msg = first.receive(Duration.ofSeconds(5));
+            assertNotNull(msg);
+            consumed.add(msg.value());
+        }
+        Checkpoint mid = first.checkpoint();
+        first.close();
+
+        // Merge the two segments into one child — the originals seal.
+        var meta = admin.scalableTopics().getMetadata(topic);
+        java.util.List<Long> activeIds = new java.util.ArrayList<>();
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeIds.add(seg.getSegmentId());
+            }
+        }
+        assertEquals(activeIds.size(), 2);
+        admin.scalableTopics().mergeSegments(topic, activeIds.get(0), 
activeIds.get(1));
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 1, "merge must collapse to 1 active segment");
+        });
+
+        int suffix = 20;
+        for (int i = 0; i < suffix; i++) {
+            String v = "post-" + i;
+            producer.newMessage().key("k-suffix-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> resumed = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(mid)
+                .create();
+
+        Set<String> received = new HashSet<>();
+        int expected = (prefix - prefix / 2) + suffix;
+        long deadline = System.currentTimeMillis() + 30_000L;
+        while (received.size() < expected && System.currentTimeMillis() < 
deadline) {
+            Message<String> msg = resumed.receive(Duration.ofSeconds(1));
+            if (msg != null) {
+                received.add(msg.value());
+            }
+        }
+        Set<String> expectedSet = new HashSet<>(sent);
+        expectedSet.removeAll(consumed);
+        assertEquals(received, expectedSet,
+                "resume across merge must replay both sealed parents' tails 
and the merged child");
+    }
+}
diff --git 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
index 615473a382c..e9c6153fec5 100644
--- 
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
+++ 
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
@@ -83,6 +83,29 @@ public interface CheckpointConsumerBuilder<T> {
      */
     CheckpointConsumerBuilder<T> consumerName(String name);
 
+    /**
+     * Join a named consumer group on this scalable topic. All consumers that 
pass the
+     * same {@code group} share the topic's segments via the broker's 
subscription
+     * coordinator: each segment is assigned to exactly one consumer in the 
group at a
+     * time, and segments rebalance automatically as consumers join or leave.
+     *
+     * <p>When unset (the default), the consumer is unmanaged: it 
independently reads
+     * every segment from {@link #startPosition the configured start position},
+     * unaffected by any other consumer. This matches the original reader-style
+     * behavior of {@link CheckpointConsumer}.
+     *
+     * <p>Unlike {@link StreamConsumerBuilder}'s subscription, joining a group 
does
+     * <em>not</em> cause the broker to persist a cursor: each consumer still 
resumes
+     * from the {@code startPosition} (or a checkpoint deserialized from a 
previous
+     * run) it provides at create time. The group only affects how segments are
+     * distributed across the live consumer set; once every member of the 
group goes
+     * away, no broker-side state remains for it.
+     *
+     * @param group the consumer group name
+     * @return this builder instance for chaining
+     */
+    CheckpointConsumerBuilder<T> consumerGroup(String group);
+
     /**
      * Configure end-to-end message encryption for decryption.
      *
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
index c525d02d878..c65e16d0fc5 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
@@ -26,6 +26,7 @@ import 
org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
 import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
 import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
 import org.apache.pulsar.common.naming.TopicName;
 
 /**
@@ -38,6 +39,7 @@ final class CheckpointConsumerBuilderV5<T> implements 
CheckpointConsumerBuilder<
     private String topicName;
     private Checkpoint startPosition = CheckpointV5.LATEST;
     private String consumerName;
+    private String consumerGroup;
     private EncryptionPolicy encryptionPolicy;
 
     CheckpointConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
@@ -65,10 +67,27 @@ final class CheckpointConsumerBuilderV5<T> implements 
CheckpointConsumerBuilder<
         }
 
         TopicName topic = V5Utils.asScalableTopicName(topicName);
-        DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
 
+        if (consumerGroup != null && !consumerGroup.isEmpty()) {
+            // Managed: register with the broker's subscription coordinator 
under the
+            // configured group. Each consumer in the group ends up with a 
disjoint set
+            // of segments.
+            String name = consumerName != null && !consumerName.isEmpty()
+                    ? consumerName
+                    : "v5-checkpoint-" + V5RandomIds.randomAlphanumeric(8);
+            ScalableConsumerClient session = new ScalableConsumerClient(
+                    client.v4Client(), topic, consumerGroup, name,
+                    ScalableConsumerType.CHECKPOINT);
+            return session.start()
+                    .thenCompose(initialAssignment -> 
ScalableCheckpointConsumer.createManagedAsync(
+                            client, v5Schema, topic.toString(), session, 
initialAssignment,
+                            startPosition, name));
+        }
+
+        // Unmanaged: read every active segment, no broker-side state.
+        DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
         return dagWatch.start()
-                .thenCompose(initialLayout -> 
ScalableCheckpointConsumer.createAsync(
+                .thenCompose(initialLayout -> 
ScalableCheckpointConsumer.createUnmanagedAsync(
                         client, v5Schema, dagWatch, initialLayout, 
startPosition, consumerName));
     }
 
@@ -90,6 +109,12 @@ final class CheckpointConsumerBuilderV5<T> implements 
CheckpointConsumerBuilder<
         return this;
     }
 
+    @Override
+    public CheckpointConsumerBuilderV5<T> consumerGroup(String group) {
+        this.consumerGroup = group;
+        return this;
+    }
+
     @Override
     public CheckpointConsumerBuilderV5<T> encryptionPolicy(EncryptionPolicy 
policy) {
         this.encryptionPolicy = policy;
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 34413a8153f..9288780f39b 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.LinkedTransferQueue;
 import java.util.concurrent.TimeUnit;
+import 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.v5.Checkpoint;
 import org.apache.pulsar.client.api.v5.CheckpointConsumer;
@@ -45,11 +46,23 @@ import 
org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
 /**
  * V5 CheckpointConsumer implementation for scalable topics.
  *
- * <p>Maintains per-segment v4 Readers (no subscription). Messages from all 
segments
- * are multiplexed into a single receive queue. Supports creating checkpoints 
(atomic
- * snapshots of positions across all segments) and seeking to previously saved 
checkpoints.
+ * <p>Maintains per-segment v4 Readers (no subscription cursor — position 
state lives
+ * client-side and is materialized as {@link Checkpoint} via {@link 
#checkpoint()}).
+ * Messages from all segments are multiplexed into a single receive queue. 
Supports
+ * creating checkpoints (atomic snapshots of positions across all segments) 
and seeking
+ * to previously saved checkpoints.
+ *
+ * <p>Two segment sources are supported, picked by the caller:
+ * <ul>
+ *   <li><b>Unmanaged</b> — the consumer reads every active segment (driven by 
a
+ *       {@link DagWatchClient}). Multiple unmanaged consumers each 
independently see
+ *       the full stream.</li>
+ *   <li><b>Managed (consumer group)</b> — segments are assigned by the 
broker's
+ *       subscription coordinator (driven by a {@link ScalableConsumerClient}).
+ *       Consumers in the same group share segments and rebalance on 
join/leave.</li>
+ * </ul>
  */
-final class ScalableCheckpointConsumer<T> implements CheckpointConsumer<T>, 
DagWatchClient.LayoutChangeListener {
+final class ScalableCheckpointConsumer<T> implements CheckpointConsumer<T> {
 
     private static final Logger LOG = 
Logger.get(ScalableCheckpointConsumer.class);
     private final Logger log;
@@ -57,7 +70,7 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
     private final PulsarClientV5 client;
     private final Schema<T> v5Schema;
     private final org.apache.pulsar.client.api.Schema<T> v4Schema;
-    private final DagWatchClient dagWatch;
+    private final AutoCloseable sourceHandle;
     private final String topicName;
     private final Checkpoint startPosition;
     private final String consumerName;
@@ -76,14 +89,15 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
 
     private ScalableCheckpointConsumer(PulsarClientV5 client,
                                        Schema<T> v5Schema,
-                                       DagWatchClient dagWatch,
+                                       String topicName,
+                                       AutoCloseable sourceHandle,
                                        Checkpoint startPosition,
                                        String consumerName) {
         this.client = client;
         this.v5Schema = v5Schema;
         this.v4Schema = SchemaAdapter.toV4(v5Schema);
-        this.dagWatch = dagWatch;
-        this.topicName = dagWatch.topicName().toString();
+        this.sourceHandle = sourceHandle;
+        this.topicName = topicName;
         this.startPosition = startPosition;
         this.consumerName = consumerName;
         this.log = LOG.with().attr("topic", topicName).build();
@@ -91,22 +105,56 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
     }
 
     /**
-     * Create a fully initialized consumer asynchronously. The returned future 
completes
-     * only after every initial segment reader has been successfully created. 
If any
-     * reader creation fails, all already-created readers are closed and the 
future
-     * completes exceptionally.
+     * Create an unmanaged consumer that reads every segment in the active 
layout
+     * (independent of any consumer group). Driven by a {@link 
DagWatchClient}, which
+     * is closed on consumer close.
      */
-    static <T> CompletableFuture<CheckpointConsumer<T>> 
createAsync(PulsarClientV5 client,
-                                                                    Schema<T> 
v5Schema,
-                                                                    
DagWatchClient dagWatch,
-                                                                    
ClientSegmentLayout initialLayout,
-                                                                    Checkpoint 
startPosition,
-                                                                    String 
consumerName) {
+    static <T> CompletableFuture<CheckpointConsumer<T>> createUnmanagedAsync(
+            PulsarClientV5 client, Schema<T> v5Schema, DagWatchClient dagWatch,
+            ClientSegmentLayout initialLayout, Checkpoint startPosition, 
String consumerName) {
         ScalableCheckpointConsumer<T> consumer = new 
ScalableCheckpointConsumer<>(
-                client, v5Schema, dagWatch, startPosition, consumerName);
-        return consumer.createSegmentReaders(initialLayout)
+                client, v5Schema, dagWatch.topicName().toString(), dagWatch, 
startPosition, consumerName);
+        return consumer.applyAssignment(allSegmentsOf(initialLayout))
                 .thenApply(__ -> {
-                    dagWatch.setListener(consumer);
+                    dagWatch.setListener((newLayout, oldLayout) -> 
consumer.onAssignmentChange(
+                            allSegmentsOf(newLayout),
+                            oldLayout != null ? allSegmentsOf(oldLayout) : 
List.of()));
+                    return (CheckpointConsumer<T>) consumer;
+                })
+                .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, 
___) -> {
+                    throw ex instanceof CompletionException ce ? ce : new 
CompletionException(ex);
+                }));
+    }
+
+    /**
+     * Active + sealed segments. The unmanaged checkpoint consumer needs to 
subscribe
+     * to sealed segments too so a {@link Checkpoint} taken before a split or 
merge
+     * still resumes correctly: the reader on each sealed parent picks up from 
the
+     * saved position and drains its remaining backlog before naturally 
exiting on
+     * {@code TopicTerminated}.
+     */
+    private static List<ActiveSegment> allSegmentsOf(ClientSegmentLayout 
layout) {
+        List<ActiveSegment> all = new ArrayList<>(
+                layout.activeSegments().size() + 
layout.sealedSegments().size());
+        all.addAll(layout.activeSegments());
+        all.addAll(layout.sealedSegments());
+        return all;
+    }
+
+    /**
+     * Create a managed consumer that reads only the segments the broker's 
subscription
+     * coordinator assigns to it within the named consumer group. Driven by a
+     * {@link ScalableConsumerClient}, which is closed on consumer close.
+     */
+    static <T> CompletableFuture<CheckpointConsumer<T>> createManagedAsync(
+            PulsarClientV5 client, Schema<T> v5Schema, String topicName,
+            ScalableConsumerClient session, List<ActiveSegment> 
initialAssignment,
+            Checkpoint startPosition, String consumerName) {
+        ScalableCheckpointConsumer<T> consumer = new 
ScalableCheckpointConsumer<>(
+                client, v5Schema, topicName, session, startPosition, 
consumerName);
+        return consumer.applyAssignment(initialAssignment)
+                .thenApply(__ -> {
+                    session.setListener(consumer::onAssignmentChange);
                     return (CheckpointConsumer<T>) consumer;
                 })
                 .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, 
___) -> {
@@ -282,7 +330,11 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
 
     CompletableFuture<Void> closeAsync() {
         closed = true;
-        dagWatch.close();
+        try {
+            sourceHandle.close();
+        } catch (Exception e) {
+            log.warn().exceptionMessage(e).log("Error closing segment source");
+        }
 
         List<CompletableFuture<Void>> futures = new ArrayList<>();
         for (var future : segmentReaders.values()) {
@@ -295,42 +347,42 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
                 .whenComplete((__, ___) -> segmentReaders.clear());
     }
 
-    // --- Layout change handling ---
+    // --- Assignment change handling ---
 
-    @Override
-    public void onLayoutChange(ClientSegmentLayout newLayout, 
ClientSegmentLayout oldLayout) {
+    private void onAssignmentChange(List<ActiveSegment> newSegments, 
List<ActiveSegment> oldSegments) {
         // Fully async: safe to run on the netty IO thread that delivered the 
update.
-        createSegmentReaders(newLayout).exceptionally(ex -> {
-            log.warn().exceptionMessage(ex).log("Failed to apply layout 
update");
+        applyAssignment(newSegments).exceptionally(ex -> {
+            log.warn().exceptionMessage(ex).log("Failed to apply segment 
assignment");
             return null;
         });
     }
 
-    private CompletableFuture<Void> createSegmentReaders(ClientSegmentLayout 
layout) {
-        var activeIds = ConcurrentHashMap.<Long>newKeySet();
-        for (var seg : layout.activeSegments()) {
-            activeIds.add(seg.segmentId());
+    private CompletableFuture<Void> applyAssignment(List<ActiveSegment> 
assigned) {
+        var assignedIds = ConcurrentHashMap.<Long>newKeySet();
+        for (var seg : assigned) {
+            assignedIds.add(seg.segmentId());
         }
 
-        // Close readers for segments that are no longer active 
(fire-and-forget).
+        // Close readers for segments removed from the assignment (sealed, or 
rebalanced
+        // away to another consumer in the same group).
         for (var entry : segmentReaders.entrySet()) {
-            if (!activeIds.contains(entry.getKey())) {
+            if (!assignedIds.contains(entry.getKey())) {
                 log.info().attr("segmentId", entry.getKey())
-                        .log("Closing reader for sealed segment");
+                        .log("Closing reader for segment removed from 
assignment");
                 entry.getValue().thenAccept(r -> r.closeAsync());
                 segmentReaders.remove(entry.getKey());
+                lastReceivedPositions.remove(entry.getKey());
             }
         }
 
         // Create readers for new segments asynchronously.
         List<CompletableFuture<?>> futures = new ArrayList<>();
-        for (var seg : layout.activeSegments()) {
+        for (var seg : assigned) {
             futures.add(segmentReaders.computeIfAbsent(seg.segmentId(),
                     id -> createSegmentReaderAsync(seg)));
         }
 
-        log.info().attr("epoch", layout.epoch())
-                .attr("segments", activeIds).log("Checkpoint consumer layout 
applied");
+        log.info().attr("segments", assignedIds).log("Checkpoint consumer 
assignment applied");
         return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
     }
 
@@ -353,18 +405,21 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
     }
 
     private org.apache.pulsar.client.api.MessageId resolveStartPosition(long 
segmentId) {
-        // If we have a regular checkpoint, use the segment's position
         if (startPosition instanceof CheckpointV5 cp) {
             var pos = cp.segmentPositions().get(segmentId);
             if (pos != null) {
                 return pos;
             }
+            // The checkpoint has no position for this segment, which means the
+            // segment didn't exist when the checkpoint was taken (it's a child
+            // produced by a split or merge after the snapshot). All of its 
data is
+            // therefore newer than the checkpoint — read from the earliest.
+            return org.apache.pulsar.client.api.MessageId.earliest;
         }
-        // For sentinel checkpoints or missing segments, use earliest/latest
         if (startPosition == CheckpointV5.EARLIEST) {
             return org.apache.pulsar.client.api.MessageId.earliest;
         }
-        // Default to latest
+        // CheckpointV5.LATEST and anything else: latest.
         return org.apache.pulsar.client.api.MessageId.latest;
     }
 
@@ -381,11 +436,30 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
                 startReadLoop(reader, segmentId);
             }
         }).exceptionally(ex -> {
-            if (!closed) {
-                log.warn().attr("segmentId", segmentId)
-                        .exception(ex).log("Error reading from segment, 
retrying");
-                startReadLoop(reader, segmentId);
+            Throwable cause = ex instanceof CompletionException ce && 
ce.getCause() != null
+                    ? ce.getCause() : ex;
+            if (closed || cause instanceof AlreadyClosedException) {
+                // The whole consumer is shutting down or this reader was 
closed
+                // externally (segment sealed or rebalanced away). Stop the 
loop.
+                return null;
+            }
+            if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                    .TopicTerminatedException) {
+                // Sealed segment fully drained server-side. Close the reader 
and drop
+                // it from the map so resources are released; the segment's 
data has
+                // already crossed into messageQueue.
+                log.info().attr("segmentId", segmentId)
+                        .log("Sealed segment drained, closing reader");
+                segmentReaders.remove(segmentId);
+                reader.closeAsync();
+                return null;
             }
+            log.warn().attr("segmentId", segmentId)
+                    .exception(ex).log("Error reading from segment, retrying");
+            // Hop to the v4 client's internal executor so repeated 
synchronous failures
+            // don't grow the stack unboundedly.
+            client.v4Client().getInternalExecutorService()
+                    .execute(() -> startReadLoop(reader, segmentId));
             return null;
         });
     }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index f8ae92b014a..d3c356ecb55 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -75,10 +75,8 @@ final class StreamConsumerBuilderV5<T> implements 
StreamConsumerBuilder<T> {
         TopicName topic = V5Utils.asScalableTopicName(topicName);
         // Default the consumer name to a stable random when the user didn't 
set one —
         // ScalableConsumerClient uses it as the registration key with the 
controller.
-        // Use a full UUID (~122 bits of entropy) so the registration key is 
unique in
-        // practice, even across many concurrent attaches.
         if (conf.getConsumerName() == null || 
conf.getConsumerName().isEmpty()) {
-            conf.setConsumerName("v5-stream-" + java.util.UUID.randomUUID());
+            conf.setConsumerName("v5-stream-" + 
V5RandomIds.randomAlphanumeric(8));
         }
         ScalableConsumerClient session = new ScalableConsumerClient(
                 client.v4Client(),
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5RandomIds.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5RandomIds.java
new file mode 100644
index 00000000000..82f3444fbe5
--- /dev/null
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5RandomIds.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.security.SecureRandom;
+
+/**
+ * Random short identifiers used for default consumer / producer names that 
the V5
+ * builders generate when the user did not provide one.
+ *
+ * <p>Eight alphanumeric characters from a {@link SecureRandom} source give 
~48 bits of
+ * entropy — enough to avoid collisions in practice while keeping log lines 
and broker
+ * registration keys short.
+ */
+final class V5RandomIds {
+
+    private static final SecureRandom RANDOM = new SecureRandom();
+    private static final String ALPHABET =
+            "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+
+    private V5RandomIds() {
+    }
+
+    /**
+     * Returns an alphanumeric random string of the given length.
+     */
+    static String randomAlphanumeric(int len) {
+        StringBuilder sb = new StringBuilder(len);
+        for (int i = 0; i < len; i++) {
+            sb.append(ALPHABET.charAt(RANDOM.nextInt(ALPHABET.length())));
+        }
+        return sb.toString();
+    }
+}


Reply via email to