This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a7eed351db [test][client-v5] V5 layout-dynamics coverage + consumer 
fixes (DAG replay, late-ack) (#25611)
8a7eed351db is described below

commit 8a7eed351dbfb3b6bf5213bfccc04e3f215c3b63
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Apr 29 04:11:31 2026 -0700

    [test][client-v5] V5 layout-dynamics coverage + consumer fixes (DAG replay, 
late-ack) (#25611)
---
 .../pulsar/client/api/v5/V5CumulativeAckTest.java  | 285 +++++++++++++++++++++
 .../pulsar/client/api/v5/V5DAGFollowingTest.java   | 180 +++++++++++++
 .../pulsar/client/api/v5/V5KeyRoutingTest.java     | 149 +++++++++++
 .../api/v5/V5QueueConsumerMultiSegmentAckTest.java | 163 ++++++++++++
 .../pulsar/client/api/v5/V5SegmentMergeTest.java   | 114 +++++++++
 .../pulsar/client/api/v5/V5SegmentSplitTest.java   | 114 +++++++++
 .../pulsar/client/impl/v5/ClientSegmentLayout.java |  33 ++-
 .../client/impl/v5/ScalableQueueConsumer.java      |  44 +++-
 .../client/impl/v5/ScalableStreamConsumer.java     |  45 ++--
 9 files changed, 1095 insertions(+), 32 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CumulativeAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CumulativeAckTest.java
new file mode 100644
index 00000000000..9560da10ef0
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CumulativeAckTest.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link StreamConsumer#acknowledgeCumulative(MessageId)} on a 
multi-segment
+ * scalable topic.
+ *
+ * <p>Each message returned by a V5 stream consumer carries a position vector 
— a snapshot
+ * of the latest delivered offset on every active segment as of that point in 
the stream.
+ * Cumulative-acking a single {@link MessageId} must therefore advance the 
cursor on
+ * <em>every</em> segment, not just the segment that produced this particular 
message.
+ *
+ * <p>We assert this end-to-end: produce N messages across multiple segments, 
drain them,
+ * cumulative-ack the last received id, then attach a fresh consumer on the 
same
+ * subscription. The fresh consumer must observe an empty backlog.
+ */
+public class V5CumulativeAckTest extends V5ClientBaseTest {
+
+    @Test
+    public void testCumulativeAckCoversAllSegments() throws Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "cum-ack-sub";
+
+        // Subscribe before producing so the subscription cursor exists at the 
start of
+        // every segment — an EARLIEST consumer would also work, but this 
keeps the test
+        // unambiguous.
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 100;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        Set<String> received = new HashSet<>();
+        MessageId last = null;
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed message #" + i);
+            received.add(msg.value());
+            last = msg.id();
+        }
+        assertEquals(received, sent, "should drain every produced message");
+
+        // Single cumulative ack on the last id — the position vector embedded 
in this id
+        // must advance the cursor on every segment.
+        assertNotNull(last);
+        consumer.acknowledgeCumulative(last);
+
+        // Close and re-open the consumer on the same subscription. With a 
complete
+        // cumulative ack, the new attach must see no backlog.
+        consumer.close();
+
+        @Cleanup
+        StreamConsumer<String> reopened = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Message<String> stale = reopened.receive(Duration.ofMillis(500));
+        assertNull(stale,
+                "after a single cumulative ack of the last received id, no 
message"
+                        + " should remain unacked on any segment");
+    }
+
+    /**
+     * Cumulative ack in the middle of the stream: re-attaching the consumer 
must replay
+     * exactly the unacked tail (not the whole stream).
+     */
+    @Test
+    public void testCumulativeAckMidStreamReplaysUnackedTail() throws 
Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "cum-ack-mid-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 60;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Receive the first ackedCount messages, ack the last of those 
cumulatively.
+        int ackedCount = 25;
+        Set<String> firstHalf = new HashSet<>();
+        MessageId midId = null;
+        for (int i = 0; i < ackedCount; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed mid-stream message #" + i);
+            firstHalf.add(msg.value());
+            midId = msg.id();
+        }
+        consumer.acknowledgeCumulative(midId);
+
+        // Drain the rest into a second set so both halves stay disjoint for 
the assertion.
+        Set<String> secondHalf = new HashSet<>();
+        for (int i = 0; i < n - ackedCount; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed tail message #" + (ackedCount + i));
+            secondHalf.add(msg.value());
+        }
+        // The consumer must have seen all n messages exactly once across the 
two halves.
+        Set<String> all = new HashSet<>(firstHalf);
+        all.addAll(secondHalf);
+        assertEquals(all, sent, "consumer must see every produced message 
exactly once");
+
+        // Close before re-attaching so the broker treats this as a fresh 
consumer.
+        consumer.close();
+
+        @Cleanup
+        StreamConsumer<String> reopened = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        // Re-attach must replay only the unacked tail. The exact count can 
differ from
+        // (n - ackedCount) because the cumulative ack is per-segment and may 
include
+        // messages that hadn't been touched yet on segments not crossed by 
the position
+        // vector at midId. We assert: replayed.size() <= n - ackedCount, and 
every
+        // replayed value comes from secondHalf (i.e. nothing already-acked is 
replayed).
+        Set<String> replayed = new HashSet<>();
+        while (true) {
+            Message<String> msg = reopened.receive(Duration.ofMillis(500));
+            if (msg == null) {
+                break;
+            }
+            replayed.add(msg.value());
+        }
+        assertTrue(replayed.size() <= n - ackedCount,
+                "replay must not exceed unacked tail size (" + (n - ackedCount)
+                        + "), got " + replayed.size());
+        for (String v : replayed) {
+            assertTrue(!firstHalf.contains(v) || secondHalf.contains(v),
+                    "replayed value " + v + " was already cumulative-acked");
+        }
+    }
+
+    /**
+     * Cumulative ack must continue to do the right thing across a split: 
messages from
+     * the now-sealed parent segment that are covered by the ack stay acked, 
and the cursor
+     * advances onto the children. Re-attaching after the split sees only the 
post-ack tail.
+     */
+    @Test
+    public void testCumulativeAckCrossesSealedParentToChildren() throws 
Exception {
+        String topic = newScalableTopic(1);
+        String subscription = "cum-ack-cross-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Parent batch on the only initial segment.
+        int parentBatch = 30;
+        for (int i = 0; i < parentBatch; i++) {
+            producer.newMessage().key("k-" + i).value("parent-" + i).send();
+        }
+
+        // Drain the parent batch and cumulative-ack the last one. That should 
fully ack
+        // the (still active) parent segment.
+        MessageId lastParent = null;
+        for (int i = 0; i < parentBatch; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed parent #" + i);
+            lastParent = msg.id();
+        }
+        consumer.acknowledgeCumulative(lastParent);
+
+        // Split.
+        long parentId = -1;
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                parentId = seg.getSegmentId();
+                break;
+            }
+        }
+        assertTrue(parentId >= 0);
+        admin.scalableTopics().splitSegment(topic, parentId);
+
+        // The split is synchronous server-side, but the V5 client's DAG watch 
is async —
+        // sending before the watch delivers the new layout would fail with 
TopicTerminated.
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 2, "split must produce 2 active children");
+        });
+
+        // Child batch on the new children.
+        int childBatch = 30;
+        Set<String> childSent = new HashSet<>();
+        for (int i = 0; i < childBatch; i++) {
+            String v = "child-" + i;
+            producer.newMessage().key("k-child-" + i).value(v).send();
+            childSent.add(v);
+        }
+
+        // Drain children and cumulative-ack the last one.
+        Set<String> childReceived = new HashSet<>();
+        MessageId lastChild = null;
+        for (int i = 0; i < childBatch; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed child #" + i);
+            childReceived.add(msg.value());
+            lastChild = msg.id();
+        }
+        assertEquals(childReceived, childSent, "consumer must see every child 
message");
+        consumer.acknowledgeCumulative(lastChild);
+        consumer.close();
+
+        // Re-attach: parent fully acked + every child acked → no backlog.
+        @Cleanup
+        StreamConsumer<String> reopened = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Message<String> stale = reopened.receive(Duration.ofMillis(500));
+        assertNull(stale,
+                "after cumulative-acking parent and all children, no message"
+                        + " should remain on the subscription");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DAGFollowingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DAGFollowingTest.java
new file mode 100644
index 00000000000..461e20c0106
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DAGFollowingTest.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for DAG-following on the QueueConsumer:
+ * <ul>
+ *   <li>An existing subscription transparently drains across a chain of 
splits.</li>
+ *   <li>A brand-new subscription created with EARLIEST after a split replays 
the sealed
+ *       parent's data before transitioning onto the active children — the 
QueueConsumer
+ *       subscribes to every segment in the DAG (active + sealed), not just 
the active
+ *       ones.</li>
+ * </ul>
+ */
+public class V5DAGFollowingTest extends V5ClientBaseTest {
+
+    @Test
+    public void testNewEarliestSubscriptionReplaysSealedParent() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Write data to the only initial segment, then split it. After the 
split the
+        // parent is sealed and two children are active.
+        int parentBatch = 30;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < parentBatch; i++) {
+            String v = "parent-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        admin.scalableTopics().splitSegment(topic, activeIds(topic).get(0));
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeIds(topic).size(), 2));
+
+        // Produce more — these route through the new active children.
+        int childBatch = 30;
+        for (int i = 0; i < childBatch; i++) {
+            String v = "child-" + i;
+            producer.newMessage().key("k-child-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Brand-new subscription with EARLIEST. It must see both the sealed 
parent's
+        // backlog and the children's data, in some order.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("new-earliest-replay")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Set<String> received = new HashSet<>();
+        int total = parentBatch + childBatch;
+        for (int i = 0; i < total; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "missed message #" + i
+                    + " (received so far: " + received.size() + "/" + total + 
")");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received, sent,
+                "new EARLIEST subscription must replay sealed-parent + 
children");
+    }
+
+    @Test
+    public void testMultiGenerationDAGFollowing() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("multi-gen")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        Set<String> sent = new HashSet<>();
+        int batch = 20;
+
+        // Generation 0: single segment.
+        for (int i = 0; i < batch; i++) {
+            String v = "g0-" + i;
+            producer.newMessage().key("k0-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Split 0 → {1, 2}: now 2 active. The admin call is synchronous 
server-side,
+        // but the V5 client's DAG watch is async — wait for the producer's 
view to catch
+        // up before sending into the new layout (otherwise the next send hits 
the now-
+        // sealed parent and fails with TopicTerminated).
+        admin.scalableTopics().splitSegment(topic, activeIds(topic).get(0));
+        waitForActiveCount(topic, 2);
+
+        // Generation 1: 2 segments.
+        for (int i = 0; i < batch; i++) {
+            String v = "g1-" + i;
+            producer.newMessage().key("k1-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Split one of the children: pick the smaller-id active child for 
determinism.
+        // After this we have 3 active.
+        admin.scalableTopics().splitSegment(topic, activeIds(topic).get(0));
+        waitForActiveCount(topic, 3);
+
+        // Generation 2: 3 segments.
+        for (int i = 0; i < batch; i++) {
+            String v = "g2-" + i;
+            producer.newMessage().key("k2-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Drain: 3 generations × batch each.
+        Set<String> received = new HashSet<>();
+        int total = 3 * batch;
+        for (int i = 0; i < total; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "missed message #" + i
+                    + " (received so far: " + received.size() + "/" + total + 
")");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+
+        assertEquals(received.size(), total, "expected " + total + " distinct 
messages");
+        assertEquals(received, sent, "received set must cover every 
generation");
+    }
+
+    private List<Long> activeIds(String topic) throws Exception {
+        var meta = admin.scalableTopics().getMetadata(topic);
+        List<Long> ids = new ArrayList<>();
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                ids.add(seg.getSegmentId());
+            }
+        }
+        java.util.Collections.sort(ids);
+        return ids;
+    }
+
+    private void waitForActiveCount(String topic, int expected) {
+        Awaitility.await().untilAsserted(() -> 
assertEquals(activeIds(topic).size(), expected,
+                "layout never converged to " + expected + " active segments"));
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5KeyRoutingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5KeyRoutingTest.java
new file mode 100644
index 00000000000..0e023e84df3
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5KeyRoutingTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 producer's key-based routing on a multi-segment 
scalable topic.
+ *
+ * <p>The V5 producer hashes the message key (MurmurHash3, masked to 16 bits) 
and routes
+ * to the active segment whose hash range covers the hash. We don't assert 
which segment
+ * a key lands on (an internal detail), but we assert the two observable 
contracts:
+ * <ul>
+ *   <li>Same key → same segment, every time. We verify this by sending many 
messages with
+ *       a small set of keys and checking that the per-key receive order 
matches the
+ *       per-key send order on a {@link StreamConsumer} (which preserves order 
within a
+ *       segment but not across segments).</li>
+ *   <li>Different keys spread across segments. With four segments and a 
varied key set,
+ *       all four segments receive at least one message — we verify this by 
inspecting the
+ *       admin stats for each segment's per-segment topic.</li>
+ * </ul>
+ */
+public class V5KeyRoutingTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSameKeyPreservesOrder() throws Exception {
+        String topic = newScalableTopic(4);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("key-route")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // 5 keys × 20 messages each, interleaved — same-key messages must 
arrive in send
+        // order (which only holds if every same-key message lands on the same 
segment).
+        List<String> keys = List.of("alpha", "bravo", "charlie", "delta", 
"echo");
+        int perKey = 20;
+        Map<String, List<String>> sent = new HashMap<>();
+        for (String k : keys) {
+            sent.put(k, new ArrayList<>());
+        }
+        for (int i = 0; i < perKey; i++) {
+            for (String k : keys) {
+                String value = k + "-" + i;
+                producer.newMessage().key(k).value(value).send();
+                sent.get(k).add(value);
+            }
+        }
+
+        Map<String, List<String>> received = new HashMap<>();
+        for (String k : keys) {
+            received.put(k, new ArrayList<>());
+        }
+        int total = keys.size() * perKey;
+        for (int i = 0; i < total; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed message #" + i);
+            String key = msg.key().orElseThrow(() -> new 
AssertionError("missing key"));
+            received.get(key).add(msg.value());
+            consumer.acknowledgeCumulative(msg.id());
+        }
+
+        for (String k : keys) {
+            assertEquals(received.get(k), sent.get(k),
+                    "per-key order must be preserved for key=" + k);
+        }
+    }
+
+    @Test
+    public void testMultiSegmentEndToEndCount() throws Exception {
+        String topic = newScalableTopic(4);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("multi-seg-count")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // 200 distinct keys: every send should land on some segment, every 
message should
+        // come out exactly once. This is the basic "scalable topic with N>1 
segments
+        // doesn't drop or duplicate messages" sanity check.
+        int n = 200;
+        java.util.Set<String> sent = new java.util.HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String value = "v-" + i;
+            producer.newMessage().key("k-" + i).value(value).send();
+            sent.add(value);
+        }
+
+        java.util.Set<String> received = new java.util.HashSet<>();
+        MessageId last = null;
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed message #" + i);
+            received.add(msg.value());
+            last = msg.id();
+        }
+        consumer.acknowledgeCumulative(last);
+
+        assertEquals(received.size(), n, "expected " + n + " distinct 
messages");
+        assertEquals(received, sent, "received set must match sent set");
+
+        var meta = admin.scalableTopics().getMetadata(topic);
+        int active = 0;
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                active++;
+            }
+        }
+        assertEquals(active, 4, "expected 4 active segments");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerMultiSegmentAckTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerMultiSegmentAckTest.java
new file mode 100644
index 00000000000..ea557a2ce6b
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerMultiSegmentAckTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * QueueConsumer counterpart of {@link 
V5CumulativeAckTest#testCumulativeAckCoversAllSegments()}.
+ *
+ * <p>QueueConsumer doesn't expose cumulative ack — each {@link 
QueueConsumer#acknowledge}
+ * call covers exactly one message. We assert that individually acking every 
message on a
+ * multi-segment topic correctly advances the cursor on every per-segment v4 
consumer:
+ * after a close + re-attach on the same subscription, the new consumer must 
observe an
+ * empty backlog.
+ */
+public class V5QueueConsumerMultiSegmentAckTest extends V5ClientBaseTest {
+
+    @Test
+    public void testIndividualAcksCoverAllSegments() throws Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "queue-multi-seg-ack-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 100;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed message #" + i);
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received, sent, "should drain every produced message");
+
+        // Close before re-attaching so the broker treats this as a fresh 
consumer.
+        consumer.close();
+
+        @Cleanup
+        QueueConsumer<String> reopened = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        Message<String> stale = reopened.receive(Duration.ofMillis(500));
+        assertNull(stale,
+                "after individually acking every message on every segment, no 
message"
+                        + " should remain on the subscription");
+    }
+
+    /**
+     * QueueConsumer counterpart of
+     * {@link 
V5CumulativeAckTest#testCumulativeAckMidStreamReplaysUnackedTail()}. Drain
+     * the full stream but only ack the first half. After close + re-attach, 
the unacked
+     * tail must be redelivered exactly — no already-acked message comes back, 
and
+     * nothing from the unacked half is dropped.
+     */
+    @Test
+    public void testPartialAcksReplayUnackedOnReattach() throws Exception {
+        String topic = newScalableTopic(4);
+        String subscription = "queue-partial-ack-sub";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int n = 60;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            String v = "v-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Drain all n. Ack the first ackedCount messages; leave the rest 
unacked.
+        int ackedCount = 25;
+        Set<String> acked = new HashSet<>();
+        Set<String> unacked = new HashSet<>();
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "missed message #" + i);
+            if (i < ackedCount) {
+                consumer.acknowledge(msg.id());
+                acked.add(msg.value());
+            } else {
+                unacked.add(msg.value());
+            }
+        }
+        Set<String> all = new HashSet<>(acked);
+        all.addAll(unacked);
+        assertEquals(all, sent, "consumer must see every produced message 
exactly once");
+
+        // Close before re-attaching so the broker treats this as a fresh 
consumer.
+        consumer.close();
+
+        @Cleanup
+        QueueConsumer<String> reopened = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName(subscription)
+                .subscribe();
+
+        // Drain the replay. We expect exactly the unacked set — no acked 
message comes
+        // back, and every unacked message is redelivered.
+        Set<String> replayed = new HashSet<>();
+        while (true) {
+            Message<String> msg = reopened.receive(Duration.ofMillis(500));
+            if (msg == null) {
+                break;
+            }
+            replayed.add(msg.value());
+            reopened.acknowledge(msg.id());
+        }
+        assertEquals(replayed, unacked,
+                "replay must equal exactly the unacked tail (no acked replay, 
no drop)");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentMergeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentMergeTest.java
new file mode 100644
index 00000000000..bbf43432bf5
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentMergeTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code admin.scalableTopics().mergeSegments(...)}: two 
adjacent active
+ * segments get merged into one, the originals seal, and the new child takes 
over their
+ * combined hash range. Producers and consumers in flight must keep working 
without
+ * drop or duplication.
+ */
+public class V5SegmentMergeTest extends V5ClientBaseTest {
+
+    @Test
+    public void testMergeMidFlowKeepsAllMessages() throws Exception {
+        String topic = newScalableTopic(2);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("merge-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // Produce against both initial segments.
+        int firstBatch = 50;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < firstBatch; i++) {
+            String v = "before-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Pick the two active segment IDs to merge.
+        var meta = admin.scalableTopics().getMetadata(topic);
+        List<Long> activeIds = new ArrayList<>();
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeIds.add(seg.getSegmentId());
+            }
+        }
+        assertEquals(activeIds.size(), 2, "expected 2 active segments before 
merge");
+
+        admin.scalableTopics().mergeSegments(topic, activeIds.get(0), 
activeIds.get(1));
+
+        // The merge admin call is synchronous server-side, but the V5 
client's DAG watch
+        // is async — sending into the now-sealed children before the watch 
delivers the
+        // new layout would fail with TopicTerminated. Wait for the producer's 
view to
+        // catch up.
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 1, "merge must collapse to 1 active segment");
+        });
+
+        // Produce after the merge: must land on the new (sole) segment.
+        int secondBatch = 50;
+        for (int i = 0; i < secondBatch; i++) {
+            String v = "after-" + i;
+            producer.newMessage().key("k-after-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Drain.
+        Set<String> received = new HashSet<>();
+        int total = firstBatch + secondBatch;
+        for (int i = 0; i < total; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "missed message #" + i + " (received so far: " 
+ received.size() + ")");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+
+        assertEquals(received.size(), total, "expected " + total + " distinct 
messages");
+        assertEquals(received, sent, "received set must equal sent set across 
the merge");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentSplitTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentSplitTest.java
new file mode 100644
index 00000000000..43ecef2e82c
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SegmentSplitTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.awaitility.Awaitility;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code admin.scalableTopics().splitSegment(...)}: an 
admin-triggered split
+ * mid-flow seals the parent and creates two child segments. Producers and 
consumers
+ * already attached to the topic must keep working through the layout change 
without
+ * dropping or duplicating messages.
+ */
+public class V5SegmentSplitTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSplitMidFlowKeepsAllMessages() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("split-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        // First batch: lands on the only initial segment (id=0).
+        int firstBatch = 50;
+        Set<String> sent = new HashSet<>();
+        for (int i = 0; i < firstBatch; i++) {
+            String v = "before-" + i;
+            producer.newMessage().key("k-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Find the active segment id, then split it.
+        long activeSegmentId = -1;
+        var meta = admin.scalableTopics().getMetadata(topic);
+        for (var seg : meta.getSegments().values()) {
+            if (seg.isActive()) {
+                activeSegmentId = seg.getSegmentId();
+                break;
+            }
+        }
+        assertTrue(activeSegmentId >= 0, "expected exactly one active segment 
before split");
+
+        admin.scalableTopics().splitSegment(topic, activeSegmentId);
+
+        // The split admin call is synchronous server-side, but the V5 
client's DAG watch
+        // is async — sending into the now-sealed parent before the watch 
delivers the new
+        // layout would fail with TopicTerminated. Wait for the producer's 
view to catch up.
+        Awaitility.await().untilAsserted(() -> {
+            int active = 0;
+            var m = admin.scalableTopics().getMetadata(topic);
+            for (var seg : m.getSegments().values()) {
+                if (seg.isActive()) {
+                    active++;
+                }
+            }
+            assertEquals(active, 2, "split must produce 2 active children");
+        });
+
+        // Second batch: must land on the new children.
+        int secondBatch = 50;
+        for (int i = 0; i < secondBatch; i++) {
+            String v = "after-" + i;
+            producer.newMessage().key("k-after-" + i).value(v).send();
+            sent.add(v);
+        }
+
+        // Drain everything via the consumer. Total = firstBatch + secondBatch.
+        Set<String> received = new HashSet<>();
+        int total = firstBatch + secondBatch;
+        for (int i = 0; i < total; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg, "missed message #" + i + " (received so far: " 
+ received.size() + ")");
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+
+        // No duplicates, no losses, across the split.
+        assertEquals(received.size(), total, "expected " + total + " distinct 
messages");
+        assertEquals(received, sent, "received set must equal sent set across 
the split");
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
index ffa7a6700e3..e4c043af6db 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ClientSegmentLayout.java
@@ -39,17 +39,20 @@ final class ClientSegmentLayout {
 
     private final long epoch;
     private final List<ActiveSegment> activeSegments;
+    private final List<ActiveSegment> sealedSegments;
     private final Map<Long, String> segmentBrokerUrls;
     private final String controllerBrokerUrl;
     private final String controllerBrokerUrlTls;
 
     private ClientSegmentLayout(long epoch,
                                 List<ActiveSegment> activeSegments,
+                                List<ActiveSegment> sealedSegments,
                                 Map<Long, String> segmentBrokerUrls,
                                 String controllerBrokerUrl,
                                 String controllerBrokerUrlTls) {
         this.epoch = epoch;
         this.activeSegments = Collections.unmodifiableList(activeSegments);
+        this.sealedSegments = Collections.unmodifiableList(sealedSegments);
         this.segmentBrokerUrls = Map.copyOf(segmentBrokerUrls);
         this.controllerBrokerUrl = controllerBrokerUrl;
         this.controllerBrokerUrlTls = controllerBrokerUrlTls;
@@ -68,25 +71,32 @@ final class ClientSegmentLayout {
             brokerUrls.put(addr.getSegmentId(), addr.getBrokerUrl());
         }
 
-        // Build active segments list
+        // Partition segments into active and sealed lists.
         List<ActiveSegment> activeSegments = new ArrayList<>();
+        List<ActiveSegment> sealedSegments = new ArrayList<>();
         for (int i = 0; i < dag.getSegmentsCount(); i++) {
             SegmentInfoProto seg = dag.getSegmentAt(i);
+            HashRange range = HashRange.of((int) seg.getHashStart(), (int) 
seg.getHashEnd());
+            String segTopicName = SegmentTopicName.fromParent(
+                    parentTopic, range, seg.getSegmentId()).toString();
+            ActiveSegment ref = new ActiveSegment(seg.getSegmentId(), range, 
segTopicName);
             if (seg.getState() == 
org.apache.pulsar.common.api.proto.SegmentState.ACTIVE) {
-                HashRange range = HashRange.of((int) seg.getHashStart(), (int) 
seg.getHashEnd());
-                String segTopicName = SegmentTopicName.fromParent(
-                        parentTopic, range, seg.getSegmentId()).toString();
-                activeSegments.add(new ActiveSegment(seg.getSegmentId(), 
range, segTopicName));
+                activeSegments.add(ref);
+            } else if (seg.getState() == 
org.apache.pulsar.common.api.proto.SegmentState.SEALED) {
+                sealedSegments.add(ref);
             }
         }
 
-        // Sort by hash range start for efficient routing
+        // Sort by hash range start for efficient routing on the active side. 
Sealed order
+        // doesn't matter for correctness; sort for stable iteration in tests 
/ logs.
         activeSegments.sort(Comparator.comparingInt(s -> 
s.hashRange().start()));
+        
sealedSegments.sort(Comparator.comparingLong(ActiveSegment::segmentId));
 
         String controllerUrl = dag.hasControllerBrokerUrl() ? 
dag.getControllerBrokerUrl() : null;
         String controllerUrlTls = dag.hasControllerBrokerUrlTls() ? 
dag.getControllerBrokerUrlTls() : null;
 
-        return new ClientSegmentLayout(epoch, activeSegments, brokerUrls, 
controllerUrl, controllerUrlTls);
+        return new ClientSegmentLayout(epoch, activeSegments, sealedSegments, 
brokerUrls,
+                controllerUrl, controllerUrlTls);
     }
 
     long epoch() {
@@ -97,6 +107,15 @@ final class ClientSegmentLayout {
         return activeSegments;
     }
 
+    /**
+     * Sealed segments still present in the DAG. These have finite (eventually 
drained)
+     * data and a v4 consumer subscribing to one of them will receive any 
remaining
+     * messages and then a {@code TopicTerminatedException}.
+     */
+    List<ActiveSegment> sealedSegments() {
+        return sealedSegments;
+    }
+
     Map<Long, String> segmentBrokerUrls() {
         return segmentBrokerUrls;
     }
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 1bbaf8905cb..eed2c41bd1f 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -286,16 +286,29 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
     }
 
     private CompletableFuture<Void> subscribeSegments(ClientSegmentLayout 
layout) {
-        var activeIds = ConcurrentHashMap.<Long>newKeySet();
+        // We subscribe to every segment present in the DAG — both ACTIVE 
(current write
+        // targets) and SEALED (historical, may still hold unconsumed data for 
this
+        // subscription). The receive loop drains a sealed segment naturally 
and closes
+        // it on TopicTerminated; until then, the v4 consumer must remain 
alive so user
+        // acks for messages received before the seal can still be forwarded.
+        var wantedIds = ConcurrentHashMap.<Long>newKeySet();
+        List<ActiveSegment> wantedSegments = new 
ArrayList<>(layout.activeSegments().size()
+                + layout.sealedSegments().size());
         for (var seg : layout.activeSegments()) {
-            activeIds.add(seg.segmentId());
+            wantedIds.add(seg.segmentId());
+            wantedSegments.add(seg);
+        }
+        for (var seg : layout.sealedSegments()) {
+            wantedIds.add(seg.segmentId());
+            wantedSegments.add(seg);
         }
 
-        // Close consumers for segments that are no longer active 
(fire-and-forget).
+        // Close consumers for segments that have dropped out of the DAG 
entirely (post
+        // garbage collection). Sealed-but-still-present segments stay 
subscribed.
         for (var entry : segmentConsumers.entrySet()) {
-            if (!activeIds.contains(entry.getKey())) {
+            if (!wantedIds.contains(entry.getKey())) {
                 log.info().attr("segmentId", entry.getKey())
-                        .log("Closing consumer for sealed segment");
+                        .log("Closing consumer for segment no longer in DAG");
                 entry.getValue().thenAccept(c -> c.closeAsync());
                 segmentConsumers.remove(entry.getKey());
             }
@@ -304,13 +317,15 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
         // Subscribe to new segments. The returned future completes when all 
subscribes
         // finish (successfully or with error).
         List<CompletableFuture<?>> futures = new ArrayList<>();
-        for (var seg : layout.activeSegments()) {
+        for (var seg : wantedSegments) {
             futures.add(segmentConsumers.computeIfAbsent(seg.segmentId(),
                     id -> createSegmentConsumerAsync(seg)));
         }
 
         log.info().attr("epoch", layout.epoch())
-                .attr("segments", activeIds).log("Queue consumer layout 
applied");
+                .attr("active", layout.activeSegments().size())
+                .attr("sealed", layout.sealedSegments().size())
+                .log("Queue consumer layout applied");
         return 
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
     }
 
@@ -345,8 +360,19 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
             Throwable cause = ex instanceof CompletionException ce && 
ce.getCause() != null ? ce.getCause() : ex;
             if (closed
                     || cause instanceof 
org.apache.pulsar.client.api.PulsarClientException.AlreadyClosedException) {
-                // This segment consumer is done (either the whole consumer is 
closing,
-                // or the segment was sealed and its v4 consumer closed by a 
layout update).
+                // The whole consumer is shutting down or the v4 consumer was 
closed
+                // externally; stop the receive loop without touching the map.
+                return null;
+            }
+            if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException.TopicTerminatedException) {
+                // Segment is sealed and fully drained server-side. Close the 
v4
+                // consumer and drop it from the map — any further ack on a 
message
+                // already pulled from this segment is a no-op (the cursor is 
at the
+                // end and the entry is gone).
+                log.info().attr("segmentId", segmentId)
+                        .log("Sealed segment drained, closing v4 consumer");
+                segmentConsumers.remove(segmentId);
+                v4Consumer.closeAsync();
                 return null;
             }
             log.warn().attr("segmentId", segmentId)
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 15b969d60c1..31844f069bb 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -274,23 +274,17 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
     }
 
     private CompletableFuture<Void> subscribeSegments(ClientSegmentLayout 
layout) {
+        // The stream consumer is controller-driven: it only subscribes to the 
segments
+        // the controller currently designates as active. When a segment moves 
out of the
+        // active set (because of a split or merge), we leave its v4 consumer 
alive so
+        // pending acks for messages already pulled from it can still be 
forwarded; the
+        // receive loop closes the consumer naturally once it sees 
TopicTerminated.
         var activeIds = ConcurrentHashMap.<Long>newKeySet();
         for (var seg : layout.activeSegments()) {
             activeIds.add(seg.segmentId());
         }
 
-        // Close consumers for segments that are no longer active 
(fire-and-forget).
-        for (var entry : segmentConsumers.entrySet()) {
-            if (!activeIds.contains(entry.getKey())) {
-                log.info().attr("segmentId", entry.getKey())
-                        .log("Closing consumer for sealed segment");
-                entry.getValue().thenAccept(c -> c.closeAsync());
-                segmentConsumers.remove(entry.getKey());
-                latestDelivered.remove(entry.getKey());
-            }
-        }
-
-        // Subscribe to new segments asynchronously.
+        // Subscribe to newly-active segments asynchronously.
         List<CompletableFuture<?>> futures = new ArrayList<>();
         for (var seg : layout.activeSegments()) {
             futures.add(segmentConsumers.computeIfAbsent(seg.segmentId(),
@@ -346,11 +340,30 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
                 startReceiveLoop(v4Consumer, segmentId);
             }
         }).exceptionally(ex -> {
-            if (!closed) {
-                log.warn().attr("segmentId", segmentId)
-                        .exception(ex).log("Error receiving from segment, 
retrying");
-                startReceiveLoop(v4Consumer, segmentId);
+            Throwable cause = ex instanceof 
java.util.concurrent.CompletionException ce
+                    && ce.getCause() != null ? ce.getCause() : ex;
+            if (closed
+                    || cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                            .AlreadyClosedException) {
+                // The whole consumer is shutting down or the v4 consumer was 
closed
+                // externally; stop the receive loop without touching the map.
+                return null;
+            }
+            if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException
+                    .TopicTerminatedException) {
+                // Segment fully drained server-side. Drop it from the map and 
close the
+                // v4 consumer; pending acks from this point on are no-ops 
(cursor is at
+                // the end and the entry is gone).
+                log.info().attr("segmentId", segmentId)
+                        .log("Sealed segment drained, closing v4 consumer");
+                segmentConsumers.remove(segmentId);
+                latestDelivered.remove(segmentId);
+                v4Consumer.closeAsync();
+                return null;
             }
+            log.warn().attr("segmentId", segmentId)
+                    .exception(ex).log("Error receiving from segment, 
retrying");
+            startReceiveLoop(v4Consumer, segmentId);
             return null;
         });
     }

Reply via email to