This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8a7eed351db [test][client-v5] V5 layout-dynamics coverage + consumer
fixes (DAG replay, late-ack) (#25611)
8a7eed351db is described below
commit 8a7eed351dbfb3b6bf5213bfccc04e3f215c3b63
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 29 04:11:31 2026 -0700
[test][client-v5] V5 layout-dynamics coverage + consumer fixes (DAG replay,
late-ack) (#25611)
---
.../pulsar/client/api/v5/V5CumulativeAckTest.java | 285 +++++++++++++++++++++
.../pulsar/client/api/v5/V5DAGFollowingTest.java | 180 +++++++++++++
.../pulsar/client/api/v5/V5KeyRoutingTest.java | 149 +++++++++++
.../api/v5/V5QueueConsumerMultiSegmentAckTest.java | 163 ++++++++++++
.../pulsar/client/api/v5/V5SegmentMergeTest.java | 114 +++++++++
.../pulsar/client/api/v5/V5SegmentSplitTest.java | 114 +++++++++
.../pulsar/client/impl/v5/ClientSegmentLayout.java | 33 ++-
.../client/impl/v5/ScalableQueueConsumer.java | 44 +++-
.../client/impl/v5/ScalableStreamConsumer.java | 45 ++--
9 files changed, 1095 insertions(+), 32 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CumulativeAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CumulativeAckTest.java
new file mode 100644
index 00000000000..9560da10ef0
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CumulativeAckTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link StreamConsumer#acknowledgeCumulative(MessageId)} on a
multi-segment
+ * scalable topic.
+ *
+ * <p>Each message returned by a V5 stream consumer carries a position vector
— a snapshot
+ * of the latest delivered offset on every active segment as of that point in
the stream.
+ * Cumulative-acking a single {@link MessageId} must therefore advance the
cursor on
+ * <em>every</em> segment, not just the segment that produced this particular
message.
+ *
+ * <p>We assert this end-to-end: produce N messages across multiple segments,
drain them,
+ * cumulative-ack the last received id, then attach a fresh consumer on the
same
+ * subscription. The fresh consumer must observe an empty backlog.
+ */
+public class V5CumulativeAckTest extends V5ClientBaseTest {
+
+ @Test
+ public void testCumulativeAckCoversAllSegments() throws Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "cum-ack-sub";
+
+ // Subscribe before producing so the subscription cursor exists at the
start of
+ // every segment — an EARLIEST consumer would also work, but this
keeps the test
+ // unambiguous.
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 100;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ Set<String> received = new HashSet<>();
+ MessageId last = null;
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed message #" + i);
+ received.add(msg.value());
+ last = msg.id();
+ }
+ assertEquals(received, sent, "should drain every produced message");
+
+ // Single cumulative ack on the last id — the position vector embedded
in this id
+ // must advance the cursor on every segment.
+ assertNotNull(last);
+ consumer.acknowledgeCumulative(last);
+
+ // Close and re-open the consumer on the same subscription. With a
complete
+ // cumulative ack, the new attach must see no backlog.
+ consumer.close();
+
+ @Cleanup
+ StreamConsumer<String> reopened =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Message<String> stale = reopened.receive(Duration.ofMillis(500));
+ assertNull(stale,
+ "after a single cumulative ack of the last received id, no
message"
+ + " should remain unacked on any segment");
+ }
+
+ /**
+ * Cumulative ack in the middle of the stream: re-attaching the consumer
must replay
+ * exactly the unacked tail (not the whole stream).
+ */
+ @Test
+ public void testCumulativeAckMidStreamReplaysUnackedTail() throws
Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "cum-ack-mid-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 60;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Receive the first ackedCount messages, ack the last of those
cumulatively.
+ int ackedCount = 25;
+ Set<String> firstHalf = new HashSet<>();
+ MessageId midId = null;
+ for (int i = 0; i < ackedCount; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed mid-stream message #" + i);
+ firstHalf.add(msg.value());
+ midId = msg.id();
+ }
+ consumer.acknowledgeCumulative(midId);
+
+ // Drain the rest into a second set so both halves stay disjoint for
the assertion.
+ Set<String> secondHalf = new HashSet<>();
+ for (int i = 0; i < n - ackedCount; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed tail message #" + (ackedCount + i));
+ secondHalf.add(msg.value());
+ }
+ // The consumer must have seen all n messages exactly once across the
two halves.
+ Set<String> all = new HashSet<>(firstHalf);
+ all.addAll(secondHalf);
+ assertEquals(all, sent, "consumer must see every produced message
exactly once");
+
+ // Close before re-attaching so the broker treats this as a fresh
consumer.
+ consumer.close();
+
+ @Cleanup
+ StreamConsumer<String> reopened =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ // Re-attach must replay only the unacked tail. The exact count can
differ from
+ // (n - ackedCount) because the cumulative ack is per-segment and may
include
+ // messages that hadn't been touched yet on segments not crossed by
the position
+ // vector at midId. We assert: replayed.size() <= n - ackedCount, and
every
+ // replayed value comes from secondHalf (i.e. nothing already-acked is
replayed).
+ Set<String> replayed = new HashSet<>();
+ while (true) {
+ Message<String> msg = reopened.receive(Duration.ofMillis(500));
+ if (msg == null) {
+ break;
+ }
+ replayed.add(msg.value());
+ }
+ assertTrue(replayed.size() <= n - ackedCount,
+ "replay must not exceed unacked tail size (" + (n - ackedCount)
+ + "), got " + replayed.size());
+ for (String v : replayed) {
+ assertTrue(!firstHalf.contains(v) || secondHalf.contains(v),
+ "replayed value " + v + " was already cumulative-acked");
+ }
+ }
+
+ /**
+ * Cumulative ack must continue to do the right thing across a split:
messages from
+ * the now-sealed parent segment that are covered by the ack stay acked,
and the cursor
+ * advances onto the children. Re-attaching after the split sees only the
post-ack tail.
+ */
+ @Test
+ public void testCumulativeAckCrossesSealedParentToChildren() throws
Exception {
+ String topic = newScalableTopic(1);
+ String subscription = "cum-ack-cross-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Parent batch on the only initial segment.
+ int parentBatch = 30;
+ for (int i = 0; i < parentBatch; i++) {
+ producer.newMessage().key("k-" + i).value("parent-" + i).send();
+ }
+
+ // Drain the parent batch and cumulative-ack the last one. That should
fully ack
+ // the (still active) parent segment.
+ MessageId lastParent = null;
+ for (int i = 0; i < parentBatch; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed parent #" + i);
+ lastParent = msg.id();
+ }
+ consumer.acknowledgeCumulative(lastParent);
+
+ // Split.
+ long parentId = -1;
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ parentId = seg.getSegmentId();
+ break;
+ }
+ }
+ assertTrue(parentId >= 0);
+ admin.scalableTopics().splitSegment(topic, parentId);
+
+ // The split is synchronous server-side, but the V5 client's DAG watch
is async —
+ // sending before the watch delivers the new layout would fail with
TopicTerminated.
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 2, "split must produce 2 active children");
+ });
+
+ // Child batch on the new children.
+ int childBatch = 30;
+ Set<String> childSent = new HashSet<>();
+ for (int i = 0; i < childBatch; i++) {
+ String v = "child-" + i;
+ producer.newMessage().key("k-child-" + i).value(v).send();
+ childSent.add(v);
+ }
+
+ // Drain children and cumulative-ack the last one.
+ Set<String> childReceived = new HashSet<>();
+ MessageId lastChild = null;
+ for (int i = 0; i < childBatch; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed child #" + i);
+ childReceived.add(msg.value());
+ lastChild = msg.id();
+ }
+ assertEquals(childReceived, childSent, "consumer must see every child
message");
+ consumer.acknowledgeCumulative(lastChild);
+ consumer.close();
+
+ // Re-attach: parent fully acked + every child acked → no backlog.
+ @Cleanup
+ StreamConsumer<String> reopened =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Message<String> stale = reopened.receive(Duration.ofMillis(500));
+ assertNull(stale,
+ "after cumulative-acking parent and all children, no message"
+ + " should remain on the subscription");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DAGFollowingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DAGFollowingTest.java
new file mode 100644
index 00000000000..461e20c0106
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DAGFollowingTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for DAG-following on the QueueConsumer:
+ * <ul>
+ * <li>An existing subscription transparently drains across a chain of
splits.</li>
+ * <li>A brand-new subscription created with EARLIEST after a split replays
the sealed
+ * parent's data before transitioning onto the active children — the
QueueConsumer
+ * subscribes to every segment in the DAG (active + sealed), not just
the active
+ * ones.</li>
+ * </ul>
+ */
+public class V5DAGFollowingTest extends V5ClientBaseTest {
+
+ @Test
+ public void testNewEarliestSubscriptionReplaysSealedParent() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Write data to the only initial segment, then split it. After the
split the
+ // parent is sealed and two children are active.
+ int parentBatch = 30;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < parentBatch; i++) {
+ String v = "parent-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ admin.scalableTopics().splitSegment(topic, activeIds(topic).get(0));
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeIds(topic).size(), 2));
+
+ // Produce more — these route through the new active children.
+ int childBatch = 30;
+ for (int i = 0; i < childBatch; i++) {
+ String v = "child-" + i;
+ producer.newMessage().key("k-child-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Brand-new subscription with EARLIEST. It must see both the sealed
parent's
+ // backlog and the children's data, in some order.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("new-earliest-replay")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Set<String> received = new HashSet<>();
+ int total = parentBatch + childBatch;
+ for (int i = 0; i < total; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "missed message #" + i
+ + " (received so far: " + received.size() + "/" + total +
")");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received, sent,
+ "new EARLIEST subscription must replay sealed-parent +
children");
+ }
+
+ @Test
+ public void testMultiGenerationDAGFollowing() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("multi-gen")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ Set<String> sent = new HashSet<>();
+ int batch = 20;
+
+ // Generation 0: single segment.
+ for (int i = 0; i < batch; i++) {
+ String v = "g0-" + i;
+ producer.newMessage().key("k0-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Split 0 → {1, 2}: now 2 active. The admin call is synchronous
server-side,
+ // but the V5 client's DAG watch is async — wait for the producer's
view to catch
+ // up before sending into the new layout (otherwise the next send hits
the now-
+ // sealed parent and fails with TopicTerminated).
+ admin.scalableTopics().splitSegment(topic, activeIds(topic).get(0));
+ waitForActiveCount(topic, 2);
+
+ // Generation 1: 2 segments.
+ for (int i = 0; i < batch; i++) {
+ String v = "g1-" + i;
+ producer.newMessage().key("k1-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Split one of the children: pick the smaller-id active child for
determinism.
+ // After this we have 3 active.
+ admin.scalableTopics().splitSegment(topic, activeIds(topic).get(0));
+ waitForActiveCount(topic, 3);
+
+ // Generation 2: 3 segments.
+ for (int i = 0; i < batch; i++) {
+ String v = "g2-" + i;
+ producer.newMessage().key("k2-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Drain: 3 generations × batch each.
+ Set<String> received = new HashSet<>();
+ int total = 3 * batch;
+ for (int i = 0; i < total; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "missed message #" + i
+ + " (received so far: " + received.size() + "/" + total +
")");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+
+ assertEquals(received.size(), total, "expected " + total + " distinct
messages");
+ assertEquals(received, sent, "received set must cover every
generation");
+ }
+
+ private List<Long> activeIds(String topic) throws Exception {
+ var meta = admin.scalableTopics().getMetadata(topic);
+ List<Long> ids = new ArrayList<>();
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ ids.add(seg.getSegmentId());
+ }
+ }
+ java.util.Collections.sort(ids);
+ return ids;
+ }
+
+ private void waitForActiveCount(String topic, int expected) {
+ Awaitility.await().untilAsserted(() ->
assertEquals(activeIds(topic).size(), expected,
+ "layout never converged to " + expected + " active segments"));
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5KeyRoutingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5KeyRoutingTest.java
new file mode 100644
index 00000000000..0e023e84df3
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5KeyRoutingTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 producer's key-based routing on a multi-segment
scalable topic.
+ *
+ * <p>The V5 producer hashes the message key (MurmurHash3, masked to 16 bits)
and routes
+ * to the active segment whose hash range covers the hash. We don't assert
which segment
+ * a key lands on (an internal detail), but we assert the two observable
contracts:
+ * <ul>
+ * <li>Same key → same segment, every time. We verify this by sending many
messages with
+ * a small set of keys and checking that the per-key receive order
matches the
+ * per-key send order on a {@link StreamConsumer} (which preserves order
within a
+ * segment but not across segments).</li>
+ * <li>Different keys spread across segments. With four segments and a
varied key set,
+ * all four segments receive at least one message — we verify this by
inspecting the
+ * admin stats for each segment's per-segment topic.</li>
+ * </ul>
+ */
+public class V5KeyRoutingTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSameKeyPreservesOrder() throws Exception {
+ String topic = newScalableTopic(4);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("key-route")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // 5 keys × 20 messages each, interleaved — same-key messages must
arrive in send
+ // order (which only holds if every same-key message lands on the same
segment).
+ List<String> keys = List.of("alpha", "bravo", "charlie", "delta",
"echo");
+ int perKey = 20;
+ Map<String, List<String>> sent = new HashMap<>();
+ for (String k : keys) {
+ sent.put(k, new ArrayList<>());
+ }
+ for (int i = 0; i < perKey; i++) {
+ for (String k : keys) {
+ String value = k + "-" + i;
+ producer.newMessage().key(k).value(value).send();
+ sent.get(k).add(value);
+ }
+ }
+
+ Map<String, List<String>> received = new HashMap<>();
+ for (String k : keys) {
+ received.put(k, new ArrayList<>());
+ }
+ int total = keys.size() * perKey;
+ for (int i = 0; i < total; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed message #" + i);
+ String key = msg.key().orElseThrow(() -> new
AssertionError("missing key"));
+ received.get(key).add(msg.value());
+ consumer.acknowledgeCumulative(msg.id());
+ }
+
+ for (String k : keys) {
+ assertEquals(received.get(k), sent.get(k),
+ "per-key order must be preserved for key=" + k);
+ }
+ }
+
+ @Test
+ public void testMultiSegmentEndToEndCount() throws Exception {
+ String topic = newScalableTopic(4);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("multi-seg-count")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // 200 distinct keys: every send should land on some segment, every
message should
+ // come out exactly once. This is the basic "scalable topic with N>1
segments
+ // doesn't drop or duplicate messages" sanity check.
+ int n = 200;
+ java.util.Set<String> sent = new java.util.HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String value = "v-" + i;
+ producer.newMessage().key("k-" + i).value(value).send();
+ sent.add(value);
+ }
+
+ java.util.Set<String> received = new java.util.HashSet<>();
+ MessageId last = null;
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed message #" + i);
+ received.add(msg.value());
+ last = msg.id();
+ }
+ consumer.acknowledgeCumulative(last);
+
+ assertEquals(received.size(), n, "expected " + n + " distinct
messages");
+ assertEquals(received, sent, "received set must match sent set");
+
+ var meta = admin.scalableTopics().getMetadata(topic);
+ int active = 0;
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 4, "expected 4 active segments");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerMultiSegmentAckTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerMultiSegmentAckTest.java
new file mode 100644
index 00000000000..ea557a2ce6b
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerMultiSegmentAckTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * QueueConsumer counterpart of {@link
V5CumulativeAckTest#testCumulativeAckCoversAllSegments()}.
+ *
+ * <p>QueueConsumer doesn't expose cumulative ack — each {@link
QueueConsumer#acknowledge}
+ * call covers exactly one message. We assert that individually acking every
message on a
+ * multi-segment topic correctly advances the cursor on every per-segment v4
consumer:
+ * after a close + re-attach on the same subscription, the new consumer must
observe an
+ * empty backlog.
+ */
+public class V5QueueConsumerMultiSegmentAckTest extends V5ClientBaseTest {
+
+ @Test
+ public void testIndividualAcksCoverAllSegments() throws Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "queue-multi-seg-ack-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 100;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed message #" + i);
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received, sent, "should drain every produced message");
+
+ // Close before re-attaching so the broker treats this as a fresh
consumer.
+ consumer.close();
+
+ @Cleanup
+ QueueConsumer<String> reopened =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ Message<String> stale = reopened.receive(Duration.ofMillis(500));
+ assertNull(stale,
+ "after individually acking every message on every segment, no
message"
+ + " should remain on the subscription");
+ }
+
+ /**
+ * QueueConsumer counterpart of
+ * {@link
V5CumulativeAckTest#testCumulativeAckMidStreamReplaysUnackedTail()}. Drain
+ * the full stream but only ack the first half. After close + re-attach,
the unacked
+ * tail must be redelivered exactly — no already-acked message comes back,
and
+ * nothing from the unacked half is dropped.
+ */
+ @Test
+ public void testPartialAcksReplayUnackedOnReattach() throws Exception {
+ String topic = newScalableTopic(4);
+ String subscription = "queue-partial-ack-sub";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int n = 60;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ String v = "v-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Drain all n. Ack the first ackedCount messages; leave the rest
unacked.
+ int ackedCount = 25;
+ Set<String> acked = new HashSet<>();
+ Set<String> unacked = new HashSet<>();
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "missed message #" + i);
+ if (i < ackedCount) {
+ consumer.acknowledge(msg.id());
+ acked.add(msg.value());
+ } else {
+ unacked.add(msg.value());
+ }
+ }
+ Set<String> all = new HashSet<>(acked);
+ all.addAll(unacked);
+ assertEquals(all, sent, "consumer must see every produced message
exactly once");
+
+ // Close before re-attaching so the broker treats this as a fresh
consumer.
+ consumer.close();
+
+ @Cleanup
+ QueueConsumer<String> reopened =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName(subscription)
+ .subscribe();
+
+ // Drain the replay. We expect exactly the unacked set — no acked
message comes
+ // back, and every unacked message is redelivered.
+ Set<String> replayed = new HashSet<>();
+ while (true) {
+ Message<String> msg = reopened.receive(Duration.ofMillis(500));
+ if (msg == null) {
+ break;
+ }
+ replayed.add(msg.value());
+ reopened.acknowledge(msg.id());
+ }
+ assertEquals(replayed, unacked,
+ "replay must equal exactly the unacked tail (no acked replay,
no drop)");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentMergeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentMergeTest.java
new file mode 100644
index 00000000000..bbf43432bf5
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentMergeTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code admin.scalableTopics().mergeSegments(...)}: two
adjacent active
+ * segments get merged into one, the originals seal, and the new child takes
over their
+ * combined hash range. Producers and consumers in flight must keep working
without
+ * drop or duplication.
+ */
+public class V5SegmentMergeTest extends V5ClientBaseTest {
+
+ @Test
+ public void testMergeMidFlowKeepsAllMessages() throws Exception {
+ String topic = newScalableTopic(2);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("merge-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // Produce against both initial segments.
+ int firstBatch = 50;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < firstBatch; i++) {
+ String v = "before-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Pick the two active segment IDs to merge.
+ var meta = admin.scalableTopics().getMetadata(topic);
+ List<Long> activeIds = new ArrayList<>();
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeIds.add(seg.getSegmentId());
+ }
+ }
+ assertEquals(activeIds.size(), 2, "expected 2 active segments before
merge");
+
+ admin.scalableTopics().mergeSegments(topic, activeIds.get(0),
activeIds.get(1));
+
+ // The merge admin call is synchronous server-side, but the V5
client's DAG watch
+ // is async — sending into the now-sealed children before the watch
delivers the
+ // new layout would fail with TopicTerminated. Wait for the producer's
view to
+ // catch up.
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 1, "merge must collapse to 1 active segment");
+ });
+
+ // Produce after the merge: must land on the new (sole) segment.
+ int secondBatch = 50;
+ for (int i = 0; i < secondBatch; i++) {
+ String v = "after-" + i;
+ producer.newMessage().key("k-after-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Drain.
+ Set<String> received = new HashSet<>();
+ int total = firstBatch + secondBatch;
+ for (int i = 0; i < total; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "missed message #" + i + " (received so far: "
+ received.size() + ")");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+
+ assertEquals(received.size(), total, "expected " + total + " distinct
messages");
+ assertEquals(received, sent, "received set must equal sent set across
the merge");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentSplitTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentSplitTest.java
new file mode 100644
index 00000000000..43ecef2e82c
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentSplitTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code admin.scalableTopics().splitSegment(...)}: an
admin-triggered split
+ * mid-flow seals the parent and creates two child segments. Producers and
consumers
+ * already attached to the topic must keep working through the layout change
without
+ * dropping or duplicating messages.
+ */
+public class V5SegmentSplitTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSplitMidFlowKeepsAllMessages() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("split-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ // First batch: lands on the only initial segment (id=0).
+ int firstBatch = 50;
+ Set<String> sent = new HashSet<>();
+ for (int i = 0; i < firstBatch; i++) {
+ String v = "before-" + i;
+ producer.newMessage().key("k-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Find the active segment id, then split it.
+ long activeSegmentId = -1;
+ var meta = admin.scalableTopics().getMetadata(topic);
+ for (var seg : meta.getSegments().values()) {
+ if (seg.isActive()) {
+ activeSegmentId = seg.getSegmentId();
+ break;
+ }
+ }
+ assertTrue(activeSegmentId >= 0, "expected exactly one active segment
before split");
+
+ admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+ // The split admin call is synchronous server-side, but the V5
client's DAG watch
+ // is async — sending into the now-sealed parent before the watch
delivers the new
+ // layout would fail with TopicTerminated. Wait for the producer's
view to catch up.
+ Awaitility.await().untilAsserted(() -> {
+ int active = 0;
+ var m = admin.scalableTopics().getMetadata(topic);
+ for (var seg : m.getSegments().values()) {
+ if (seg.isActive()) {
+ active++;
+ }
+ }
+ assertEquals(active, 2, "split must produce 2 active children");
+ });
+
+ // Second batch: must land on the new children.
+ int secondBatch = 50;
+ for (int i = 0; i < secondBatch; i++) {
+ String v = "after-" + i;
+ producer.newMessage().key("k-after-" + i).value(v).send();
+ sent.add(v);
+ }
+
+ // Drain everything via the consumer. Total = firstBatch + secondBatch.
+ Set<String> received = new HashSet<>();
+ int total = firstBatch + secondBatch;
+ for (int i = 0; i < total; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "missed message #" + i + " (received so far: "
+ received.size() + ")");
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+
+ // No duplicates, no losses, across the split.
+ assertEquals(received.size(), total, "expected " + total + " distinct
messages");
+ assertEquals(received, sent, "received set must equal sent set across
the split");
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
index ffa7a6700e3..e4c043af6db 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
@@ -39,17 +39,20 @@ final class ClientSegmentLayout {
private final long epoch;
private final List<ActiveSegment> activeSegments;
+ private final List<ActiveSegment> sealedSegments;
private final Map<Long, String> segmentBrokerUrls;
private final String controllerBrokerUrl;
private final String controllerBrokerUrlTls;
private ClientSegmentLayout(long epoch,
List<ActiveSegment> activeSegments,
+ List<ActiveSegment> sealedSegments,
Map<Long, String> segmentBrokerUrls,
String controllerBrokerUrl,
String controllerBrokerUrlTls) {
this.epoch = epoch;
this.activeSegments = Collections.unmodifiableList(activeSegments);
+ this.sealedSegments = Collections.unmodifiableList(sealedSegments);
this.segmentBrokerUrls = Map.copyOf(segmentBrokerUrls);
this.controllerBrokerUrl = controllerBrokerUrl;
this.controllerBrokerUrlTls = controllerBrokerUrlTls;
@@ -68,25 +71,32 @@ final class ClientSegmentLayout {
brokerUrls.put(addr.getSegmentId(), addr.getBrokerUrl());
}
- // Build active segments list
+ // Partition segments into active and sealed lists.
List<ActiveSegment> activeSegments = new ArrayList<>();
+ List<ActiveSegment> sealedSegments = new ArrayList<>();
for (int i = 0; i < dag.getSegmentsCount(); i++) {
SegmentInfoProto seg = dag.getSegmentAt(i);
+ HashRange range = HashRange.of((int) seg.getHashStart(), (int)
seg.getHashEnd());
+ String segTopicName = SegmentTopicName.fromParent(
+ parentTopic, range, seg.getSegmentId()).toString();
+ ActiveSegment ref = new ActiveSegment(seg.getSegmentId(), range,
segTopicName);
if (seg.getState() ==
org.apache.pulsar.common.api.proto.SegmentState.ACTIVE) {
- HashRange range = HashRange.of((int) seg.getHashStart(), (int)
seg.getHashEnd());
- String segTopicName = SegmentTopicName.fromParent(
- parentTopic, range, seg.getSegmentId()).toString();
- activeSegments.add(new ActiveSegment(seg.getSegmentId(),
range, segTopicName));
+ activeSegments.add(ref);
+ } else if (seg.getState() ==
org.apache.pulsar.common.api.proto.SegmentState.SEALED) {
+ sealedSegments.add(ref);
}
}
- // Sort by hash range start for efficient routing
+ // Sort by hash range start for efficient routing on the active side.
Sealed order
+ // doesn't matter for correctness; sort for stable iteration in tests
/ logs.
activeSegments.sort(Comparator.comparingInt(s ->
s.hashRange().start()));
+
sealedSegments.sort(Comparator.comparingLong(ActiveSegment::segmentId));
String controllerUrl = dag.hasControllerBrokerUrl() ?
dag.getControllerBrokerUrl() : null;
String controllerUrlTls = dag.hasControllerBrokerUrlTls() ?
dag.getControllerBrokerUrlTls() : null;
- return new ClientSegmentLayout(epoch, activeSegments, brokerUrls,
controllerUrl, controllerUrlTls);
+ return new ClientSegmentLayout(epoch, activeSegments, sealedSegments,
brokerUrls,
+ controllerUrl, controllerUrlTls);
}
long epoch() {
@@ -97,6 +107,15 @@ final class ClientSegmentLayout {
return activeSegments;
}
+ /**
+ * Sealed segments still present in the DAG. These have finite (eventually
drained)
+ * data and a v4 consumer subscribing to one of them will receive any
remaining
+ * messages and then a {@code TopicTerminatedException}.
+ */
+ List<ActiveSegment> sealedSegments() {
+ return sealedSegments;
+ }
+
Map<Long, String> segmentBrokerUrls() {
return segmentBrokerUrls;
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 1bbaf8905cb..eed2c41bd1f 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -286,16 +286,29 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
}
private CompletableFuture<Void> subscribeSegments(ClientSegmentLayout
layout) {
- var activeIds = ConcurrentHashMap.<Long>newKeySet();
+ // We subscribe to every segment present in the DAG — both ACTIVE
(current write
+ // targets) and SEALED (historical, may still hold unconsumed data for
this
+ // subscription). The receive loop drains a sealed segment naturally
and closes
+ // it on TopicTerminated; until then, the v4 consumer must remain
alive so user
+ // acks for messages received before the seal can still be forwarded.
+ var wantedIds = ConcurrentHashMap.<Long>newKeySet();
+ List<ActiveSegment> wantedSegments = new
ArrayList<>(layout.activeSegments().size()
+ + layout.sealedSegments().size());
for (var seg : layout.activeSegments()) {
- activeIds.add(seg.segmentId());
+ wantedIds.add(seg.segmentId());
+ wantedSegments.add(seg);
+ }
+ for (var seg : layout.sealedSegments()) {
+ wantedIds.add(seg.segmentId());
+ wantedSegments.add(seg);
}
- // Close consumers for segments that are no longer active
(fire-and-forget).
+ // Close consumers for segments that have dropped out of the DAG
entirely (post
+ // garbage collection). Sealed-but-still-present segments stay
subscribed.
for (var entry : segmentConsumers.entrySet()) {
- if (!activeIds.contains(entry.getKey())) {
+ if (!wantedIds.contains(entry.getKey())) {
log.info().attr("segmentId", entry.getKey())
- .log("Closing consumer for sealed segment");
+ .log("Closing consumer for segment no longer in DAG");
entry.getValue().thenAccept(c -> c.closeAsync());
segmentConsumers.remove(entry.getKey());
}
@@ -304,13 +317,15 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
// Subscribe to new segments. The returned future completes when all
subscribes
// finish (successfully or with error).
List<CompletableFuture<?>> futures = new ArrayList<>();
- for (var seg : layout.activeSegments()) {
+ for (var seg : wantedSegments) {
futures.add(segmentConsumers.computeIfAbsent(seg.segmentId(),
id -> createSegmentConsumerAsync(seg)));
}
log.info().attr("epoch", layout.epoch())
- .attr("segments", activeIds).log("Queue consumer layout
applied");
+ .attr("active", layout.activeSegments().size())
+ .attr("sealed", layout.sealedSegments().size())
+ .log("Queue consumer layout applied");
return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
}
@@ -345,8 +360,19 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
Throwable cause = ex instanceof CompletionException ce &&
ce.getCause() != null ? ce.getCause() : ex;
if (closed
|| cause instanceof
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException) {
- // This segment consumer is done (either the whole consumer is
closing,
- // or the segment was sealed and its v4 consumer closed by a
layout update).
+ // The whole consumer is shutting down or the v4 consumer was
closed
+ // externally; stop the receive loop without touching the map.
+ return null;
+ }
+ if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException) {
+ // Segment is sealed and fully drained server-side. Close the
v4
+ // consumer and drop it from the map — any further ack on a
message
+ // already pulled from this segment is a no-op (the cursor is
at the
+ // end and the entry is gone).
+ log.info().attr("segmentId", segmentId)
+ .log("Sealed segment drained, closing v4 consumer");
+ segmentConsumers.remove(segmentId);
+ v4Consumer.closeAsync();
return null;
}
log.warn().attr("segmentId", segmentId)
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 15b969d60c1..31844f069bb 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -274,23 +274,17 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
}
private CompletableFuture<Void> subscribeSegments(ClientSegmentLayout
layout) {
+ // The stream consumer is controller-driven: it only subscribes to the
segments
+ // the controller currently designates as active. When a segment moves
out of the
+ // active set (because of a split or merge), we leave its v4 consumer
alive so
+ // pending acks for messages already pulled from it can still be
forwarded; the
+ // receive loop closes the consumer naturally once it sees
TopicTerminated.
var activeIds = ConcurrentHashMap.<Long>newKeySet();
for (var seg : layout.activeSegments()) {
activeIds.add(seg.segmentId());
}
- // Close consumers for segments that are no longer active
(fire-and-forget).
- for (var entry : segmentConsumers.entrySet()) {
- if (!activeIds.contains(entry.getKey())) {
- log.info().attr("segmentId", entry.getKey())
- .log("Closing consumer for sealed segment");
- entry.getValue().thenAccept(c -> c.closeAsync());
- segmentConsumers.remove(entry.getKey());
- latestDelivered.remove(entry.getKey());
- }
- }
-
- // Subscribe to new segments asynchronously.
+ // Subscribe to newly-active segments asynchronously.
List<CompletableFuture<?>> futures = new ArrayList<>();
for (var seg : layout.activeSegments()) {
futures.add(segmentConsumers.computeIfAbsent(seg.segmentId(),
@@ -346,11 +340,30 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
startReceiveLoop(v4Consumer, segmentId);
}
}).exceptionally(ex -> {
- if (!closed) {
- log.warn().attr("segmentId", segmentId)
- .exception(ex).log("Error receiving from segment,
retrying");
- startReceiveLoop(v4Consumer, segmentId);
+ Throwable cause = ex instanceof
java.util.concurrent.CompletionException ce
+ && ce.getCause() != null ? ce.getCause() : ex;
+ if (closed
+ || cause instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .AlreadyClosedException) {
+ // The whole consumer is shutting down or the v4 consumer was
closed
+ // externally; stop the receive loop without touching the map.
+ return null;
+ }
+ if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException
+ .TopicTerminatedException) {
+ // Segment fully drained server-side. Drop it from the map and
close the
+ // v4 consumer; pending acks from this point on are no-ops
(cursor is at
+ // the end and the entry is gone).
+ log.info().attr("segmentId", segmentId)
+ .log("Sealed segment drained, closing v4 consumer");
+ segmentConsumers.remove(segmentId);
+ latestDelivered.remove(segmentId);
+ v4Consumer.closeAsync();
+ return null;
}
+ log.warn().attr("segmentId", segmentId)
+ .exception(ex).log("Error receiving from segment,
retrying");
+ startReceiveLoop(v4Consumer, segmentId);
return null;
});
}