This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 145b3a55d0f [fix][client] PIP-468: V5 CheckpointConsumer
consumer-group support (#25622)
145b3a55d0f is described below
commit 145b3a55d0fe04ade57c18f3a0ce15c87239d752
Author: Matteo Merli <[email protected]>
AuthorDate: Thu Apr 30 07:39:46 2026 -0700
[fix][client] PIP-468: V5 CheckpointConsumer consumer-group support (#25622)
---
.../api/v5/V5CheckpointConsumerGroupTest.java | 491 +++++++++++++++++++++
.../client/api/v5/CheckpointConsumerBuilder.java | 23 +
.../impl/v5/CheckpointConsumerBuilderV5.java | 29 +-
.../client/impl/v5/ScalableCheckpointConsumer.java | 162 +++++--
.../client/impl/v5/StreamConsumerBuilderV5.java | 4 +-
.../apache/pulsar/client/impl/v5/V5RandomIds.java | 50 +++
6 files changed, 710 insertions(+), 49 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerGroupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerGroupTest.java
new file mode 100644
index 00000000000..80673085f83
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerGroupTest.java
@@ -0,0 +1,491 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link CheckpointConsumerBuilder#consumerGroup(String)} —
multiple
+ * checkpoint consumers in the same group share the topic's segments via the
broker's
+ * subscription coordinator, identical to the StreamConsumer pattern but with
+ * client-side position state instead of broker-side cursors.
+ *
+ * <p>Two key invariants we assert here that don't apply to StreamConsumer:
+ * <ul>
+ * <li><b>Position state is client-side.</b> A checkpoint consumer resumes
from
+ * whatever {@link Checkpoint} the application passes at create time.
The broker
+ * does not track per-consumer cursors.</li>
+ * <li><b>No leftover metadata.</b> Once every member of a group goes away
and the
+ * grace timer has fired, the broker's stats report no subscription for
that
+ * group on the topic.</li>
+ * </ul>
+ */
+public class V5CheckpointConsumerGroupTest extends V5ClientBaseTest {
+
+ @Test
+ public void testConsumersInGroupShareSegments() throws Exception {
+ String topic = newScalableTopic(4);
+ String group = "group-share";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Two checkpoint consumers in the same group: each should end up with
a
+ // disjoint subset of segments via the controller's assignment.
+ @Cleanup
+ CheckpointConsumer<String> a =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+ @Cleanup
+ CheckpointConsumer<String> b =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int n = 100;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ Set<String> received = ConcurrentHashMap.newKeySet();
+ Set<String> aGot = ConcurrentHashMap.newKeySet();
+ Set<String> bGot = ConcurrentHashMap.newKeySet();
+ Thread t1 = drainTo(a, received, aGot);
+ Thread t2 = drainTo(b, received, bGot);
+ t1.join();
+ t2.join();
+
+ assertEquals(received, sent, "every message must be delivered exactly
once across the group");
+
+ Set<String> overlap = new HashSet<>(aGot);
+ overlap.retainAll(bGot);
+ assertTrue(overlap.isEmpty(), "no message should be delivered to both
consumers, overlap=" + overlap);
+
+ assertTrue(!aGot.isEmpty() && !bGot.isEmpty(),
+ "controller must split segments across both consumers"
+ + " (a=" + aGot.size() + " b=" + bGot.size() + ")");
+ }
+
+ @Test
+ public void testGroupScalesUp() throws Exception {
+ String topic = newScalableTopic(2);
+ String group = "group-scale";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Phase 1: a single consumer in the group serves both segments.
+ CheckpointConsumer<String> c1 =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int phaseN = 30;
+ produceBatch(producer, "p1-", phaseN);
+ Set<String> p1Received = drainExpected(c1, phaseN);
+ assertEquals(p1Received.size(), phaseN, "single consumer should drain
its phase exactly");
+
+ // Phase 2: add a second consumer. CheckpointConsumer has no
broker-side cursor,
+ // so every consumer starts from whatever Checkpoint it provides. Use
latest so
+ // c2 only reads messages produced after it joins (avoids re-reading
phase 1
+ // from the segments it's assigned).
+ @Cleanup
+ CheckpointConsumer<String> c2 =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.latest())
+ .create();
+
+ produceBatch(producer, "p2-", phaseN);
+ Set<String> c1Phase2 = ConcurrentHashMap.newKeySet();
+ Set<String> c2Phase2 = ConcurrentHashMap.newKeySet();
+ Set<String> p2Received = ConcurrentHashMap.newKeySet();
+ Thread d1 = drainTo(c1, p2Received, c1Phase2);
+ Thread d2 = drainTo(c2, p2Received, c2Phase2);
+ d1.join();
+ d2.join();
+ assertEquals(p2Received.size(), phaseN, "phase 2 must be fully
delivered across both consumers");
+ assertTrue(!c1Phase2.isEmpty() && !c2Phase2.isEmpty(),
+ "phase 2 must split across both consumers: c1=" +
c1Phase2.size()
+ + " c2=" + c2Phase2.size());
+
+ c1.close();
+ }
+
+ @Test
+ public void testResumesFromClientProvidedCheckpoint() throws Exception {
+ // Single segment so per-iteration value assertions are deterministic
— order
+ // is preserved within a segment.
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Use the unmanaged (no-group) checkpoint consumer here: this test
isolates
+ // the "position state lives entirely in the client checkpoint"
property.
+ // Adding a consumer group would mix in the broker's session-tracking
— that's
+ // covered by the other tests in this class.
+ CheckpointConsumer<String> first =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int prefix = 30;
+ for (int i = 0; i < prefix; i++) {
+ producer.newMessage().key("k-" + i).value("v-" + i).send();
+ }
+ for (int i = 0; i < prefix; i++) {
+ Message<String> msg = first.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed prefix message #" + i);
+ assertEquals(msg.value(), "v-" + i, "prefix message out of order");
+ assertEquals(msg.key().orElse(null), "k-" + i, "prefix key out of
order");
+ }
+ Checkpoint mid = first.checkpoint();
+
+ // Produce more data, then close the consumer entirely.
+ int suffix = 30;
+ for (int i = 0; i < suffix; i++) {
+ producer.newMessage().key("k-" + (prefix + i)).value("v-" +
(prefix + i)).send();
+ }
+ first.close();
+
+ // A fresh consumer that hands the saved checkpoint as its start
position must
+ // resume from the message immediately after the one that produced the
+ // checkpoint — no broker state is involved.
+ @Cleanup
+ CheckpointConsumer<String> second =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(mid)
+ .create();
+
+ for (int i = 0; i < suffix; i++) {
+ Message<String> msg = second.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed resumed message #" + i);
+ assertEquals(msg.value(), "v-" + (prefix + i),
+ "resume from checkpoint delivered the wrong value at index
" + i);
+ assertEquals(msg.key().orElse(null), "k-" + (prefix + i),
+ "resume from checkpoint delivered the wrong key at index "
+ i);
+ }
+ assertNull(second.receive(Duration.ofMillis(200)),
+ "no extra messages should arrive past the produced suffix");
+ }
+
+ @Test
+ public void testNoMetadataLeftAfterAllGroupMembersClose() throws Exception
{
+ String topic = newScalableTopic(2);
+ String group = "group-cleanup";
+
+ // Open two members of the group on dedicated clients. Closing the
client
+ // severs the underlying connection, which is what the broker observes
as a
+ // disconnect — only then does the grace timer arm and eventually
evict the
+ // session. (consumer.close() alone removes only the local
registration.)
+ PulsarClient cli1 = newV5Client();
+ cli1.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+ PulsarClient cli2 = newV5Client();
+ cli2.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .consumerGroup(group)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ cli1.close();
+ cli2.close();
+
+ // Once the broker grace timer fires (default 60s; SharedPulsarCluster
will
+ // lower this once PR #25619 lands), the controller evicts the
sessions and
+ // deletes the persisted registrations. After that, the topic stats
must report
+ // zero consumers in the group.
+ Awaitility.await().atMost(Duration.ofSeconds(90)).untilAsserted(() -> {
+ var stats = admin.scalableTopics().getStats(topic);
+ var sub = stats.getSubscriptions().get(group);
+ assertTrue(sub == null || sub.consumerCount() == 0,
+ "group '" + group + "' must leave no consumers behind, got
"
+ + (sub == null ? "null" : sub.consumerCount() + "
consumers"));
+ });
+ }
+
+ // --- Helpers ---
+
+ private void produceBatch(Producer<String> producer, String prefix, int n)
throws Exception {
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().key("k-" + i).value(prefix + i).send();
+ }
+ }
+
+ private Set<String> drainExpected(CheckpointConsumer<String> consumer, int
expected) throws Exception {
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ }
+ }
+ return received;
+ }
+
+ private Thread drainTo(CheckpointConsumer<String> consumer, Set<String>
all, Set<String> mine) {
+ Thread t = new Thread(() -> {
+ try {
+ while (true) {
+ Message<String> msg =
consumer.receive(Duration.ofSeconds(1));
+ if (msg == null) {
+ return;
+ }
+ all.add(msg.value());
+ mine.add(msg.value());
+ }
+ } catch (Exception ignored) {
+ }
+ }, "checkpoint-consumer-drainer");
+ t.start();
+ return t;
+ }
+
+ @Test
+ public void testReceiveMultiReturnsBatch() throws Exception {
+ // Single segment so messages stay in send order.
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int n = 20;
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().key("k-" + i).value("v-" + i).send();
+ }
+
+ // receiveMulti returns up to maxNumMessages within the timeout,
ordered as
+ // they arrived from the segment. Drain the full batch in chunks of 8.
+ Set<String> received = new HashSet<>();
+ long deadline = System.currentTimeMillis() + 10_000L;
+ while (received.size() < n && System.currentTimeMillis() < deadline) {
+ Messages<String> batch = consumer.receiveMulti(8,
Duration.ofSeconds(1));
+ assertNotNull(batch);
+ for (Message<String> msg : batch) {
+ received.add(msg.value());
+ }
+ }
+ assertEquals(received.size(), n,
+ "receiveMulti must surface every produced message exactly
once");
+ }
+
+ @Test
+ public void testCheckpointResumeAcrossSplit() throws Exception {
+ // Single initial segment so the checkpoint position is unambiguous.
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Read the prefix and take a mid-stream checkpoint (still on the
original
+ // single segment).
+ CheckpointConsumer<String> first =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int prefix = 20;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < prefix; i++) {
+ String v = "pre-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+ Set<String> consumed = new HashSet<>();
+ for (int i = 0; i < prefix / 2; i++) {
+ Message<String> msg = first.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ consumed.add(msg.value());
+ }
+ Checkpoint mid = first.checkpoint();
+ first.close();
+
+ // Split the original segment, then produce more messages — these
route to the
+ // new active children.
+ long parentId = -1;
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ parentId = seg.getSegmentId();
+ break;
+ }
+ }
+ assertTrue(parentId >= 0);
+ admin.scalableTopics().splitSegment(topic, parentId);
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 2, "split must produce 2 active children");
+ });
+
+ int suffix = 20;
+ for (int i = 0; i < suffix; i++) {
+ String v = "post-" + i;
+ producer.newMessage().key("k-suffix-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Resume from the saved checkpoint. The unmanaged consumer subscribes
to
+ // active + sealed segments, so the saved position on the now-sealed
parent
+ // still drains its remainder, and the children deliver the post-split
data.
+ @Cleanup
+ CheckpointConsumer<String> resumed =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(mid)
+ .create();
+
+ Set<String> received = new HashSet<>();
+ // Expected: prefix/2 unread parent messages + all suffix messages.
+ int expected = (prefix - prefix / 2) + suffix;
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = resumed.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ }
+ }
+ Set<String> expectedSet = new HashSet<>(sent);
+ expectedSet.removeAll(consumed);
+ assertEquals(received, expectedSet,
+ "resume across split must replay the parent tail and every
child message");
+ }
+
+ @Test
+ public void testCheckpointResumeAcrossMerge() throws Exception {
+ // Two initial segments so we can merge them.
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ CheckpointConsumer<String> first =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ int prefix = 20;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < prefix; i++) {
+ String v = "pre-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+ Set<String> consumed = new HashSet<>();
+ for (int i = 0; i < prefix / 2; i++) {
+ Message<String> msg = first.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ consumed.add(msg.value());
+ }
+ Checkpoint mid = first.checkpoint();
+ first.close();
+
+ // Merge the two segments into one child — the originals seal.
+ var meta = admin.scalableTopics().getMetadata(topic);
+ java.util.List<Long> activeIds = new java.util.ArrayList<>();
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeIds.add(seg.getSegmentId());
+ }
+ }
+ assertEquals(activeIds.size(), 2);
+ admin.scalableTopics().mergeSegments(topic, activeIds.get(0),
activeIds.get(1));
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 1, "merge must collapse to 1 active segment");
+ });
+
+ int suffix = 20;
+ for (int i = 0; i < suffix; i++) {
+ String v = "post-" + i;
+ producer.newMessage().key("k-suffix-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> resumed =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(mid)
+ .create();
+
+ Set<String> received = new HashSet<>();
+ int expected = (prefix - prefix / 2) + suffix;
+ long deadline = System.currentTimeMillis() + 30_000L;
+ while (received.size() < expected && System.currentTimeMillis() <
deadline) {
+ Message<String> msg = resumed.receive(Duration.ofSeconds(1));
+ if (msg != null) {
+ received.add(msg.value());
+ }
+ }
+ Set<String> expectedSet = new HashSet<>(sent);
+ expectedSet.removeAll(consumed);
+ assertEquals(received, expectedSet,
+ "resume across merge must replay both sealed parents' tails
and the merged child");
+ }
+}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
index 615473a382c..e9c6153fec5 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/CheckpointConsumerBuilder.java
@@ -83,6 +83,29 @@ public interface CheckpointConsumerBuilder<T> {
*/
CheckpointConsumerBuilder<T> consumerName(String name);
+ /**
+ * Join a named consumer group on this scalable topic. All consumers that
pass the
+ * same {@code group} share the topic's segments via the broker's
subscription
+ * coordinator: each segment is assigned to exactly one consumer in the
group at a
+ * time, and segments rebalance automatically as consumers join or leave.
+ *
+ * <p>When unset (the default), the consumer is unmanaged: it
independently reads
+ * every segment from {@link #startPosition the configured start position},
+ * unaffected by any other consumer. This matches the original reader-style
+ * behavior of {@link CheckpointConsumer}.
+ *
+ * <p>Unlike {@link StreamConsumerBuilder}'s subscription, joining a group
does
+ * <em>not</em> cause the broker to persist a cursor: each consumer still
resumes
+ * from the {@code startPosition} (or a checkpoint deserialized from a
previous
+ * run) it provides at create time. The group only affects how segments are
+ * distributed across the live consumer set; once every member of the
group goes
+ * away, no broker-side state remains for it.
+ *
+ * @param group the consumer group name
+ * @return this builder instance for chaining
+ */
+ CheckpointConsumerBuilder<T> consumerGroup(String group);
+
/**
* Configure end-to-end message encryption for decryption.
*
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
index c525d02d878..c65e16d0fc5 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/CheckpointConsumerBuilderV5.java
@@ -26,6 +26,7 @@ import
org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
import org.apache.pulsar.client.api.v5.PulsarClientException;
import org.apache.pulsar.client.api.v5.config.EncryptionPolicy;
import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.common.api.proto.ScalableConsumerType;
import org.apache.pulsar.common.naming.TopicName;
/**
@@ -38,6 +39,7 @@ final class CheckpointConsumerBuilderV5<T> implements
CheckpointConsumerBuilder<
private String topicName;
private Checkpoint startPosition = CheckpointV5.LATEST;
private String consumerName;
+ private String consumerGroup;
private EncryptionPolicy encryptionPolicy;
CheckpointConsumerBuilderV5(PulsarClientV5 client, Schema<T> v5Schema) {
@@ -65,10 +67,27 @@ final class CheckpointConsumerBuilderV5<T> implements
CheckpointConsumerBuilder<
}
TopicName topic = V5Utils.asScalableTopicName(topicName);
- DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
+ if (consumerGroup != null && !consumerGroup.isEmpty()) {
+ // Managed: register with the broker's subscription coordinator
under the
+ // configured group. Each consumer in the group ends up with a
disjoint set
+ // of segments.
+ String name = consumerName != null && !consumerName.isEmpty()
+ ? consumerName
+ : "v5-checkpoint-" + V5RandomIds.randomAlphanumeric(8);
+ ScalableConsumerClient session = new ScalableConsumerClient(
+ client.v4Client(), topic, consumerGroup, name,
+ ScalableConsumerType.CHECKPOINT);
+ return session.start()
+ .thenCompose(initialAssignment ->
ScalableCheckpointConsumer.createManagedAsync(
+ client, v5Schema, topic.toString(), session,
initialAssignment,
+ startPosition, name));
+ }
+
+ // Unmanaged: read every active segment, no broker-side state.
+ DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic);
return dagWatch.start()
- .thenCompose(initialLayout ->
ScalableCheckpointConsumer.createAsync(
+ .thenCompose(initialLayout ->
ScalableCheckpointConsumer.createUnmanagedAsync(
client, v5Schema, dagWatch, initialLayout,
startPosition, consumerName));
}
@@ -90,6 +109,12 @@ final class CheckpointConsumerBuilderV5<T> implements
CheckpointConsumerBuilder<
return this;
}
+ @Override
+ public CheckpointConsumerBuilderV5<T> consumerGroup(String group) {
+ this.consumerGroup = group;
+ return this;
+ }
+
@Override
public CheckpointConsumerBuilderV5<T> encryptionPolicy(EncryptionPolicy
policy) {
this.encryptionPolicy = policy;
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 34413a8153f..9288780f39b 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
+import
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.v5.Checkpoint;
import org.apache.pulsar.client.api.v5.CheckpointConsumer;
@@ -45,11 +46,23 @@ import
org.apache.pulsar.client.impl.v5.SegmentRouter.ActiveSegment;
/**
* V5 CheckpointConsumer implementation for scalable topics.
*
- * <p>Maintains per-segment v4 Readers (no subscription). Messages from all
segments
- * are multiplexed into a single receive queue. Supports creating checkpoints
(atomic
- * snapshots of positions across all segments) and seeking to previously saved
checkpoints.
+ * <p>Maintains per-segment v4 Readers (no subscription cursor — position
state lives
+ * client-side and is materialized as {@link Checkpoint} via {@link
#checkpoint()}).
+ * Messages from all segments are multiplexed into a single receive queue.
Supports
+ * creating checkpoints (atomic snapshots of positions across all segments)
and seeking
+ * to previously saved checkpoints.
+ *
+ * <p>Two segment sources are supported, picked by the caller:
+ * <ul>
+ * <li><b>Unmanaged</b> — the consumer reads every active segment (driven by
a
+ * {@link DagWatchClient}). Multiple unmanaged consumers each
independently see
+ * the full stream.</li>
+ * <li><b>Managed (consumer group)</b> — segments are assigned by the
broker's
+ * subscription coordinator (driven by a {@link ScalableConsumerClient}).
+ * Consumers in the same group share segments and rebalance on
join/leave.</li>
+ * </ul>
*/
-final class ScalableCheckpointConsumer<T> implements CheckpointConsumer<T>,
DagWatchClient.LayoutChangeListener {
+final class ScalableCheckpointConsumer<T> implements CheckpointConsumer<T> {
private static final Logger LOG =
Logger.get(ScalableCheckpointConsumer.class);
private final Logger log;
@@ -57,7 +70,7 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
private final PulsarClientV5 client;
private final Schema<T> v5Schema;
private final org.apache.pulsar.client.api.Schema<T> v4Schema;
- private final DagWatchClient dagWatch;
+ private final AutoCloseable sourceHandle;
private final String topicName;
private final Checkpoint startPosition;
private final String consumerName;
@@ -76,14 +89,15 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
private ScalableCheckpointConsumer(PulsarClientV5 client,
Schema<T> v5Schema,
- DagWatchClient dagWatch,
+ String topicName,
+ AutoCloseable sourceHandle,
Checkpoint startPosition,
String consumerName) {
this.client = client;
this.v5Schema = v5Schema;
this.v4Schema = SchemaAdapter.toV4(v5Schema);
- this.dagWatch = dagWatch;
- this.topicName = dagWatch.topicName().toString();
+ this.sourceHandle = sourceHandle;
+ this.topicName = topicName;
this.startPosition = startPosition;
this.consumerName = consumerName;
this.log = LOG.with().attr("topic", topicName).build();
@@ -91,22 +105,56 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
}
/**
- * Create a fully initialized consumer asynchronously. The returned future
completes
- * only after every initial segment reader has been successfully created.
If any
- * reader creation fails, all already-created readers are closed and the
future
- * completes exceptionally.
+ * Create an unmanaged consumer that reads every segment in the active
layout
+ * (independent of any consumer group). Driven by a {@link
DagWatchClient}, which
+ * is closed on consumer close.
*/
- static <T> CompletableFuture<CheckpointConsumer<T>>
createAsync(PulsarClientV5 client,
- Schema<T>
v5Schema,
-
DagWatchClient dagWatch,
-
ClientSegmentLayout initialLayout,
- Checkpoint
startPosition,
- String
consumerName) {
+ static <T> CompletableFuture<CheckpointConsumer<T>> createUnmanagedAsync(
+ PulsarClientV5 client, Schema<T> v5Schema, DagWatchClient dagWatch,
+ ClientSegmentLayout initialLayout, Checkpoint startPosition,
String consumerName) {
ScalableCheckpointConsumer<T> consumer = new
ScalableCheckpointConsumer<>(
- client, v5Schema, dagWatch, startPosition, consumerName);
- return consumer.createSegmentReaders(initialLayout)
+ client, v5Schema, dagWatch.topicName().toString(), dagWatch,
startPosition, consumerName);
+ return consumer.applyAssignment(allSegmentsOf(initialLayout))
.thenApply(__ -> {
- dagWatch.setListener(consumer);
+ dagWatch.setListener((newLayout, oldLayout) ->
consumer.onAssignmentChange(
+ allSegmentsOf(newLayout),
+ oldLayout != null ? allSegmentsOf(oldLayout) :
List.of()));
+ return (CheckpointConsumer<T>) consumer;
+ })
+ .exceptionallyCompose(ex -> consumer.closeAsync().handle((__,
___) -> {
+ throw ex instanceof CompletionException ce ? ce : new
CompletionException(ex);
+ }));
+ }
+
+ /**
+ * Active + sealed segments. The unmanaged checkpoint consumer needs to
subscribe
+ * to sealed segments too so a {@link Checkpoint} taken before a split or
merge
+ * still resumes correctly: the reader on each sealed parent picks up from
the
+ * saved position and drains its remaining backlog before naturally
exiting on
+ * {@code TopicTerminated}.
+ */
+ private static List<ActiveSegment> allSegmentsOf(ClientSegmentLayout
layout) {
+ List<ActiveSegment> all = new ArrayList<>(
+ layout.activeSegments().size() +
layout.sealedSegments().size());
+ all.addAll(layout.activeSegments());
+ all.addAll(layout.sealedSegments());
+ return all;
+ }
+
+ /**
+ * Create a managed consumer that reads only the segments the broker's
subscription
+ * coordinator assigns to it within the named consumer group. Driven by a
+ * {@link ScalableConsumerClient}, which is closed on consumer close.
+ */
+ static <T> CompletableFuture<CheckpointConsumer<T>> createManagedAsync(
+ PulsarClientV5 client, Schema<T> v5Schema, String topicName,
+ ScalableConsumerClient session, List<ActiveSegment>
initialAssignment,
+ Checkpoint startPosition, String consumerName) {
+ ScalableCheckpointConsumer<T> consumer = new
ScalableCheckpointConsumer<>(
+ client, v5Schema, topicName, session, startPosition,
consumerName);
+ return consumer.applyAssignment(initialAssignment)
+ .thenApply(__ -> {
+ session.setListener(consumer::onAssignmentChange);
return (CheckpointConsumer<T>) consumer;
})
.exceptionallyCompose(ex -> consumer.closeAsync().handle((__,
___) -> {
@@ -282,7 +330,11 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
CompletableFuture<Void> closeAsync() {
closed = true;
- dagWatch.close();
+ try {
+ sourceHandle.close();
+ } catch (Exception e) {
+ log.warn().exceptionMessage(e).log("Error closing segment source");
+ }
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (var future : segmentReaders.values()) {
@@ -295,42 +347,42 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
.whenComplete((__, ___) -> segmentReaders.clear());
}
- // --- Layout change handling ---
+ // --- Assignment change handling ---
- @Override
- public void onLayoutChange(ClientSegmentLayout newLayout,
ClientSegmentLayout oldLayout) {
+ private void onAssignmentChange(List<ActiveSegment> newSegments,
List<ActiveSegment> oldSegments) {
// Fully async: safe to run on the netty IO thread that delivered the
update.
- createSegmentReaders(newLayout).exceptionally(ex -> {
- log.warn().exceptionMessage(ex).log("Failed to apply layout
update");
+ applyAssignment(newSegments).exceptionally(ex -> {
+ log.warn().exceptionMessage(ex).log("Failed to apply segment
assignment");
return null;
});
}
- private CompletableFuture<Void> createSegmentReaders(ClientSegmentLayout
layout) {
- var activeIds = ConcurrentHashMap.<Long>newKeySet();
- for (var seg : layout.activeSegments()) {
- activeIds.add(seg.segmentId());
+ private CompletableFuture<Void> applyAssignment(List<ActiveSegment>
assigned) {
+ var assignedIds = ConcurrentHashMap.<Long>newKeySet();
+ for (var seg : assigned) {
+ assignedIds.add(seg.segmentId());
}
- // Close readers for segments that are no longer active
(fire-and-forget).
+ // Close readers for segments removed from the assignment (sealed, or
rebalanced
+ // away to another consumer in the same group).
for (var entry : segmentReaders.entrySet()) {
- if (!activeIds.contains(entry.getKey())) {
+ if (!assignedIds.contains(entry.getKey())) {
log.info().attr("segmentId", entry.getKey())
- .log("Closing reader for sealed segment");
+ .log("Closing reader for segment removed from
assignment");
entry.getValue().thenAccept(r -> r.closeAsync());
segmentReaders.remove(entry.getKey());
+ lastReceivedPositions.remove(entry.getKey());
}
}
// Create readers for new segments asynchronously.
List<CompletableFuture<?>> futures = new ArrayList<>();
- for (var seg : layout.activeSegments()) {
+ for (var seg : assigned) {
futures.add(segmentReaders.computeIfAbsent(seg.segmentId(),
id -> createSegmentReaderAsync(seg)));
}
- log.info().attr("epoch", layout.epoch())
- .attr("segments", activeIds).log("Checkpoint consumer layout
applied");
+ log.info().attr("segments", assignedIds).log("Checkpoint consumer
assignment applied");
return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
@@ -353,18 +405,21 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
}
private org.apache.pulsar.client.api.MessageId resolveStartPosition(long
segmentId) {
- // If we have a regular checkpoint, use the segment's position
if (startPosition instanceof CheckpointV5 cp) {
var pos = cp.segmentPositions().get(segmentId);
if (pos != null) {
return pos;
}
+ // The checkpoint has no position for this segment, which means the
+ // segment didn't exist when the checkpoint was taken (it's a child
+ // produced by a split or merge after the snapshot). All of its
data is
+ // therefore newer than the checkpoint — read from the earliest.
+ return org.apache.pulsar.client.api.MessageId.earliest;
}
- // For sentinel checkpoints or missing segments, use earliest/latest
if (startPosition == CheckpointV5.EARLIEST) {
return org.apache.pulsar.client.api.MessageId.earliest;
}
- // Default to latest
+ // CheckpointV5.LATEST and anything else: latest.
return org.apache.pulsar.client.api.MessageId.latest;
}
@@ -381,11 +436,30 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
startReadLoop(reader, segmentId);
}
}).exceptionally(ex -> {
- if (!closed) {
- log.warn().attr("segmentId", segmentId)
- .exception(ex).log("Error reading from segment,
retrying");
- startReadLoop(reader, segmentId);
+ Throwable cause = ex instanceof CompletionException ce &&
ce.getCause() != null
+ ? ce.getCause() : ex;
+ if (closed || cause instanceof AlreadyClosedException) {
+ // The whole consumer is shutting down or this reader was
closed
+ // externally (segment sealed or rebalanced away). Stop the
loop.
+ return null;
+ }
+ if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .TopicTerminatedException) {
+ // Sealed segment fully drained server-side. Close the reader
and drop
+ // it from the map so resources are released; the segment's
data has
+ // already crossed into messageQueue.
+ log.info().attr("segmentId", segmentId)
+ .log("Sealed segment drained, closing reader");
+ segmentReaders.remove(segmentId);
+ reader.closeAsync();
+ return null;
}
+ log.warn().attr("segmentId", segmentId)
+ .exception(ex).log("Error reading from segment, retrying");
+ // Hop to the v4 client's internal executor so repeated
synchronous failures
+ // don't grow the stack unboundedly.
+ client.v4Client().getInternalExecutorService()
+ .execute(() -> startReadLoop(reader, segmentId));
return null;
});
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
index f8ae92b014a..d3c356ecb55 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java
@@ -75,10 +75,8 @@ final class StreamConsumerBuilderV5<T> implements
StreamConsumerBuilder<T> {
TopicName topic = V5Utils.asScalableTopicName(topicName);
// Default the consumer name to a stable random when the user didn't
set one —
// ScalableConsumerClient uses it as the registration key with the
controller.
- // Use a full UUID (~122 bits of entropy) so the registration key is
unique in
- // practice, even across many concurrent attaches.
if (conf.getConsumerName() == null ||
conf.getConsumerName().isEmpty()) {
- conf.setConsumerName("v5-stream-" + java.util.UUID.randomUUID());
+ conf.setConsumerName("v5-stream-" +
V5RandomIds.randomAlphanumeric(8));
}
ScalableConsumerClient session = new ScalableConsumerClient(
client.v4Client(),
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5RandomIds.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5RandomIds.java
new file mode 100644
index 00000000000..82f3444fbe5
--- /dev/null
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/V5RandomIds.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import java.security.SecureRandom;
+
+/**
+ * Random short identifiers used for default consumer / producer names that
the V5
+ * builders generate when the user did not provide one.
+ *
+ * <p>Eight alphanumeric characters from a {@link SecureRandom} source give
~48 bits of
+ * entropy — enough to avoid collisions in practice while keeping log lines
and broker
+ * registration keys short.
+ */
+final class V5RandomIds {
+
+ private static final SecureRandom RANDOM = new SecureRandom();
+ private static final String ALPHABET =
+ "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+
+ private V5RandomIds() {
+ }
+
+ /**
+ * Returns an alphanumeric random string of the given length.
+ */
+ static String randomAlphanumeric(int len) {
+ StringBuilder sb = new StringBuilder(len);
+ for (int i = 0; i < len; i++) {
+ sb.append(ALPHABET.charAt(RANDOM.nextInt(ALPHABET.length())));
+ }
+ return sb.toString();
+ }
+}