This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f92a4a059be [feat] PIP-468: Basic end-to-end tests for V5 
Queue/Stream/Checkpoint consumers + async APIs (#25587)
f92a4a059be is described below

commit f92a4a059bebe481f4d35a1ba7c7cb97b420a183
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 28 08:32:44 2026 -0700

    [feat] PIP-468: Basic end-to-end tests for V5 Queue/Stream/Checkpoint 
consumers + async APIs (#25587)
---
 .../pulsar/client/api/v5/V5AsyncApisTest.java      | 237 +++++++++++++++++++
 .../api/v5/V5CheckpointConsumerBasicTest.java      | 254 +++++++++++++++++++++
 .../pulsar/client/api/v5/V5ClientBaseTest.java     |  50 +++-
 .../client/api/v5/V5QueueConsumerBasicTest.java    | 194 ++++++++++++++++
 .../client/api/v5/V5StreamConsumerBasicTest.java   | 147 ++++++++++++
 .../client/impl/v5/ScalableCheckpointConsumer.java |  37 ++-
 .../client/impl/v5/ScalableQueueConsumer.java      |   8 +-
 .../client/impl/v5/ScalableStreamConsumer.java     |   7 +-
 8 files changed, 915 insertions(+), 19 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
new file mode 100644
index 00000000000..54164b23339
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.async.AsyncCheckpointConsumer;
+import org.apache.pulsar.client.api.v5.async.AsyncProducer;
+import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
+import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 async views: {@code Producer.async()},
+ * {@code QueueConsumer.async()}, {@code StreamConsumer.async()}, and
+ * {@code CheckpointConsumer.async()}.
+ *
+ * <p>Each scenario verifies that the future returned by an async call (a) 
eventually
+ * completes, (b) carries the right value (MessageId / Message / Checkpoint), 
and (c)
+ * doesn't block the calling thread synchronously beyond what the wire 
requires.
+ *
+ * <p>Single-segment scalable topic — multi-segment / cross-segment async 
behavior lives
+ * in the dedicated scalable suites.
+ */
+public class V5AsyncApisTest extends V5ClientBaseTest {
+
+    private static final Duration AWAIT = Duration.ofSeconds(10);
+
+    @Test
+    public void testAsyncProducerSendAndFlush() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        // Establish the subscription cursor BEFORE producing so default-LATEST
+        // initial position picks up everything we send below.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("async-flush-sub")
+                .subscribe();
+
+        AsyncProducer<String> async = producer.async();
+
+        // Issue N sends without blocking on each one; collect the futures.
+        int n = 20;
+        List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+        for (int i = 0; i < n; i++) {
+            sendFutures.add(async.newMessage().value("msg-" + i).send());
+        }
+
+        // flush() must complete only after all in-flight sends have been 
acknowledged.
+        async.flush().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+        for (int i = 0; i < n; i++) {
+            assertTrue(sendFutures.get(i).isDone(),
+                    "send future " + i + " must be done after flush()");
+            assertNotNull(sendFutures.get(i).getNow(null),
+                    "send future " + i + " must carry a non-null MessageId");
+        }
+
+        // Drain to confirm the messages actually landed in order.
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected msg-" + i);
+            assertEquals(msg.value(), "msg-" + i);
+            consumer.acknowledge(msg.id());
+        }
+    }
+
+    @Test
+    public void testAsyncProducerSendCarriesMessageId() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        MessageId id = 
producer.async().newMessage().value("hello-async").send()
+                .get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+        assertNotNull(id, "sendAsync must complete with a non-null MessageId");
+    }
+
+    @Test
+    public void testAsyncQueueConsumerReceiveAndAck() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("async-q-sub")
+                .subscribe();
+
+        AsyncQueueConsumer<String> async = consumer.async();
+
+        producer.newMessage().value("hi").send();
+
+        Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
+        assertNotNull(msg);
+        assertEquals(msg.value(), "hi");
+        // acknowledge() is fire-and-forget on the async view; verify no 
message remains.
+        async.acknowledge(msg.id());
+
+        // Nothing should redeliver after a successful ack.
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "ack via async view did not stick");
+    }
+
+    @Test
+    public void testAsyncStreamConsumerReceiveAndCumulativeAck() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("async-stream-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        AsyncStreamConsumer<String> async = consumer.async();
+
+        int n = 5;
+        for (int i = 0; i < n; i++) {
+            producer.newMessage().value("m-" + i).send();
+        }
+
+        MessageId last = null;
+        for (int i = 0; i < n; i++) {
+            Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
+            assertNotNull(msg);
+            assertEquals(msg.value(), "m-" + i, "async stream consumer out of 
order at " + i);
+            last = msg.id();
+        }
+        async.acknowledgeCumulative(last);
+
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "cumulative ack via async view did not stick");
+    }
+
+    @Test
+    public void testAsyncCheckpointConsumerCheckpointAndSeek() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 6; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        AsyncCheckpointConsumer<String> async = consumer.async();
+
+        // Read 3, snapshot via async, read 3 more, then async-seek back.
+        for (int i = 0; i < 3; i++) {
+            Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
+            assertEquals(msg.value(), "v-" + i);
+        }
+        Checkpoint mark = async.checkpoint().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
+        assertNotNull(mark, "async checkpoint must complete with a non-null 
position");
+
+        for (int i = 3; i < 6; i++) {
+            Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
+            assertEquals(msg.value(), "v-" + i);
+        }
+
+        async.seek(mark).get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+        Set<String> redelivered = new HashSet<>();
+        for (int i = 0; i < 3; i++) {
+            Message<String> msg = async.receive().get(AWAIT.toMillis(), 
TimeUnit.MILLISECONDS);
+            redelivered.add(msg.value());
+        }
+        assertEquals(redelivered, Set.of("v-3", "v-4", "v-5"),
+                "async seek did not redeliver the post-checkpoint window");
+    }
+
+    @Test
+    public void testAsyncCloseCompletes() throws Exception {
+        String topic = newScalableTopic(1);
+
+        // We don't @Cleanup these — closing them via the async API is the 
test.
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("async-close-sub")
+                .subscribe();
+
+        producer.async().close().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+        consumer.async().close().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
new file mode 100644
index 00000000000..0cd3b78c8a4
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Basic end-to-end coverage for {@link CheckpointConsumer}: the unmanaged 
reader-style
+ * API used by connector frameworks (Flink, Spark) — specifically the 
start-position
+ * sentinels (earliest / latest), checkpoint roundtrip via {@link 
Checkpoint#toByteArray()},
+ * resume from a saved checkpoint, and {@link 
CheckpointConsumer#seek(Checkpoint)}.
+ *
+ * <p>All scenarios use a single-segment scalable topic to keep the focus on 
the
+ * consumer surface itself; cross-segment position-vector behavior lives in the
+ * scalable-topic suites.
+ */
+public class V5CheckpointConsumerBasicTest extends V5ClientBaseTest {
+
+    @Test
+    public void testReadFromEarliest() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("msg-" + i).send();
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected message " + i);
+            assertEquals(msg.value(), "msg-" + i, "out-of-order at index " + 
i);
+        }
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "no extra messages after draining");
+    }
+
+    @Test
+    public void testReadFromLatestSkipsExistingMessages() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Pre-existing data — should not be visible to a LATEST consumer.
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("pre-" + i).send();
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.latest())
+                .create();
+
+        // Idle until we publish more.
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "LATEST consumer must not see pre-existing messages");
+
+        for (int i = 0; i < 3; i++) {
+            producer.newMessage().value("post-" + i).send();
+        }
+        for (int i = 0; i < 3; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected post-" + i);
+            assertEquals(msg.value(), "post-" + i);
+        }
+    }
+
+    @Test
+    public void testCheckpointAndResume() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 10; i++) {
+            producer.newMessage().value("msg-" + i).send();
+        }
+
+        // Read 5, take a checkpoint, close.
+        Checkpoint saved;
+        {
+            @Cleanup
+            CheckpointConsumer<String> first = 
v5Client.newCheckpointConsumer(Schema.string())
+                    .topic(topic)
+                    .startPosition(Checkpoint.earliest())
+                    .create();
+            for (int i = 0; i < 5; i++) {
+                Message<String> msg = first.receive(Duration.ofSeconds(5));
+                assertNotNull(msg);
+                assertEquals(msg.value(), "msg-" + i);
+            }
+            saved = first.checkpoint();
+            assertNotNull(saved, "checkpoint() must return a non-null 
position");
+        }
+
+        // Reopen using the saved checkpoint — should pick up at msg-5.
+        @Cleanup
+        CheckpointConsumer<String> resumed = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(saved)
+                .create();
+        for (int i = 5; i < 10; i++) {
+            Message<String> msg = resumed.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected msg-" + i);
+            assertEquals(msg.value(), "msg-" + i,
+                    "resume from checkpoint delivered the wrong message");
+        }
+        assertNull(resumed.receive(Duration.ofMillis(200)),
+                "no extra messages after resuming through the tail");
+    }
+
+    @Test
+    public void testCheckpointSerializationRoundtrip() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 6; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+
+        // Read 3 then save a checkpoint as bytes, simulating external storage.
+        byte[] savedBytes;
+        {
+            @Cleanup
+            CheckpointConsumer<String> first = 
v5Client.newCheckpointConsumer(Schema.string())
+                    .topic(topic)
+                    .startPosition(Checkpoint.earliest())
+                    .create();
+            for (int i = 0; i < 3; i++) {
+                first.receive(Duration.ofSeconds(5));
+            }
+            savedBytes = first.checkpoint().toByteArray();
+            assertNotNull(savedBytes);
+        }
+
+        // Reopen by deserializing the checkpoint bytes — must resume at index 
3.
+        Checkpoint restored = Checkpoint.fromByteArray(savedBytes);
+        @Cleanup
+        CheckpointConsumer<String> resumed = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(restored)
+                .create();
+        List<String> received = new ArrayList<>();
+        for (int i = 3; i < 6; i++) {
+            Message<String> msg = resumed.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected v-" + i);
+            received.add(msg.value());
+        }
+        assertEquals(received, List.of("v-3", "v-4", "v-5"));
+    }
+
+    @Test
+    public void testSeekRewindsToEarlierCheckpoint() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 6; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        // Read 3, snapshot, read 3 more, then seek back to the snapshot — 
should
+        // re-deliver the second batch.
+        for (int i = 0; i < 3; i++) {
+            assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-" 
+ i);
+        }
+        Checkpoint mark = consumer.checkpoint();
+        for (int i = 3; i < 6; i++) {
+            assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-" 
+ i);
+        }
+
+        consumer.seek(mark);
+        for (int i = 3; i < 6; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "seek did not redeliver message v-" + i);
+            assertEquals(msg.value(), "v-" + i);
+        }
+    }
+
+    @Test
+    public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        Message<String> msg = consumer.receive(Duration.ofMillis(200));
+        assertNull(msg, "receive(timeout) must return null on idle topic");
+    }
+
+    @Test
+    public void testTopicAccessor() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        CheckpointConsumer<String> consumer = 
v5Client.newCheckpointConsumer(Schema.string())
+                .topic(topic)
+                .startPosition(Checkpoint.earliest())
+                .create();
+
+        assertEquals(consumer.topic(), topic);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
index 586d654743f..f658294d49e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.testng.annotations.AfterClass;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 
 /**
  * Base class for V5 client end-to-end tests.
@@ -30,27 +32,54 @@ import org.testng.annotations.AfterMethod;
  * <p>Extends {@link SharedPulsarBaseTest} (one shared in-memory broker per 
JVM, fresh
  * namespace per test method) and adds:
  * <ul>
- *   <li>{@link #newV5Client()} — V5 PulsarClient against the shared 
broker.</li>
- *   <li>{@link #newScalableTopic(int)} — creates a {@code topic://...} 
scalable topic with the
- *       given number of initial segments and returns its name.</li>
- *   <li>Auto-tracking of clients/producers/consumers created during a test 
method, closed in
- *       {@link #closeV5Resources()} after the test.</li>
+ *   <li>{@link #v5Client} — a shared V5 PulsarClient initialized once per 
test class
+ *       (mirrors the v4 {@code pulsarClient} field on the parent). Most tests 
should
+ *       just use this.</li>
+ *   <li>{@link #newV5Client()} — for the rare case where a test needs its own
+ *       dedicated client (e.g., to exercise client-lifecycle behavior). 
Tracked for
+ *       cleanup automatically.</li>
+ *   <li>{@link #newScalableTopic(int)} — creates a {@code topic://...} 
scalable topic
+ *       with the given number of initial segments and returns its name.</li>
  * </ul>
+ *
+ * <p>Tests should prefer Lombok's {@code @Cleanup} on local producer / 
consumer
+ * variables for resource lifecycle. {@link #track(AutoCloseable)} is 
available for
+ * cases where {@code @Cleanup} doesn't fit (e.g., resources stored in fields).
  */
 public abstract class V5ClientBaseTest extends SharedPulsarBaseTest {
 
+    /** Shared V5 client. Initialized in {@code @BeforeClass}, closed in 
{@code @AfterClass}. */
+    protected PulsarClient v5Client;
+
     private final List<AutoCloseable> v5Resources = new ArrayList<>();
 
+    @BeforeClass(alwaysRun = true)
+    public void setupSharedV5Client() throws Exception {
+        v5Client = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .build();
+    }
+
+    @AfterClass(alwaysRun = true)
+    public void closeSharedV5Client() throws Exception {
+        if (v5Client != null) {
+            v5Client.close();
+            v5Client = null;
+        }
+    }
+
     /**
-     * Build a fresh V5 PulsarClient connected to the shared cluster's binary 
service URL.
-     * The returned client is registered for automatic close in {@link 
#closeV5Resources()}.
+     * Build a fresh V5 PulsarClient connected to the shared cluster. The 
returned
+     * client is registered for automatic close after the test method.
+     *
+     * <p>Most tests should use the shared {@link #v5Client} instead — only 
reach for this
+     * when the test specifically needs an isolated client.
      */
     protected PulsarClient newV5Client() throws Exception {
         PulsarClient client = PulsarClient.builder()
                 .serviceUrl(getBrokerServiceUrl())
                 .build();
-        track(client);
-        return client;
+        return track(client);
     }
 
     /**
@@ -66,6 +95,7 @@ public abstract class V5ClientBaseTest extends 
SharedPulsarBaseTest {
 
     /**
      * Register an arbitrary {@link AutoCloseable} for automatic close after 
the test.
+     * Prefer {@code @Cleanup} on local variables when possible.
      */
     protected <T extends AutoCloseable> T track(T closeable) {
         v5Resources.add(closeable);
@@ -74,7 +104,7 @@ public abstract class V5ClientBaseTest extends 
SharedPulsarBaseTest {
 
     @AfterMethod(alwaysRun = true)
     public void closeV5Resources() {
-        // Close in reverse order: consumers/producers before the client they 
belong to.
+        // Close in reverse order: nested resources before the things they 
hang off of.
         for (int i = v5Resources.size() - 1; i >= 0; i--) {
             AutoCloseable c = v5Resources.get(i);
             try {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerBasicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerBasicTest.java
new file mode 100644
index 00000000000..631b245e5ec
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerBasicTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Basic end-to-end coverage for {@link QueueConsumer}: the simple CRUD-shaped 
behaviors
+ * a user expects from a Shared-style scalable consumer (multi-message ack, 
negative-ack
+ * redelivery, receive timeout, keyed-message metadata roundtrip, accessors).
+ *
+ * <p>All scenarios use a single-segment scalable topic to keep these tests 
focused on the
+ * consumer surface itself; segment split / merge / multi-consumer-rebalance 
scenarios live
+ * in the dedicated scalable-topic test suites.
+ */
+public class V5QueueConsumerBasicTest extends V5ClientBaseTest {
+
+    @Test
+    public void testProduceAndAckMany() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("basic-sub")
+                .subscribe();
+
+        int numMessages = 50;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value("msg-" + i).send();
+        }
+
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected message " + i + " but receive timed 
out");
+            // Single segment + single consumer + Shared sub: messages stay in 
send order.
+            assertEquals(msg.value(), "msg-" + i,
+                    "out-of-order delivery at index " + i);
+            consumer.acknowledge(msg.id());
+        }
+
+        // No more messages should be left in the subscription.
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "unexpected extra message after acking all sent");
+    }
+
+    @Test
+    public void testNegativeAckCausesRedelivery() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        // Tight nack-redelivery (default is ~60s) so the test doesn't have to 
wait.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("nack-sub")
+                // BackoffPolicy.fixed(...) sets multiplier=1.0 which the 
underlying v4
+                // MultiplierRedeliveryBackoff rejects (requires > 1); use 
exponential with
+                // initial==max so the cap pins the delay at 200ms.
+                .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+                        Duration.ofMillis(200), Duration.ofMillis(200)))
+                .subscribe();
+
+        producer.newMessage().value("once").send();
+
+        Message<String> first = consumer.receive(Duration.ofSeconds(10));
+        assertNotNull(first);
+        assertEquals(first.value(), "once");
+        consumer.negativeAcknowledge(first.id());
+
+        Message<String> redelivered = consumer.receive(Duration.ofSeconds(10));
+        assertNotNull(redelivered, "negativeAcknowledge did not trigger 
redelivery");
+        assertEquals(redelivered.value(), "once");
+        consumer.acknowledge(redelivered.id());
+    }
+
+    @Test
+    public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("idle-sub")
+                .subscribe();
+
+        // No producer, no traffic. receive() with a small timeout must return 
null
+        // rather than block indefinitely or throw.
+        Message<String> msg = consumer.receive(Duration.ofMillis(200));
+        assertNull(msg, "receive with timeout must return null on idle topic");
+    }
+
+    @Test
+    public void testKeyedMessagesPreserveKeyAndProperties() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("keys-sub")
+                .subscribe();
+
+        Set<String> sentKeys = new HashSet<>();
+        Map<String, Map<String, String>> sentProps = new java.util.HashMap<>();
+        for (int i = 0; i < 5; i++) {
+            String key = "key-" + i;
+            Map<String, String> props = Map.of("idx", String.valueOf(i), 
"tag", "v5-test");
+            producer.newMessage()
+                    .key(key)
+                    .value("payload-" + i)
+                    .properties(props)
+                    .send();
+            sentKeys.add(key);
+            sentProps.put(key, props);
+        }
+
+        Set<String> seen = new HashSet<>();
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+            assertNotNull(msg);
+            assertTrue(msg.key().isPresent(), "message key must be 
propagated");
+            String key = msg.key().get();
+            seen.add(key);
+            assertEquals(msg.properties(), sentProps.get(key),
+                    "properties must roundtrip for key " + key);
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(seen, sentKeys, "consumer saw a different set of keys 
than producer sent");
+    }
+
+    @Test
+    public void testTopicSubscriptionAndConsumerNameAccessors() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("accessors-sub")
+                .consumerName("accessors-consumer")
+                .subscribe();
+
+        assertEquals(consumer.topic(), topic);
+        assertEquals(consumer.subscription(), "accessors-sub");
+        assertEquals(consumer.consumerName(), "accessors-consumer");
+
+        // No consumerName set => the consumer should still expose a non-null 
name (typically
+        // generated by the broker / client for diagnostics).
+        @Cleanup
+        QueueConsumer<String> defaultName = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("default-name-sub")
+                .subscribe();
+        assertTrue(defaultName.consumerName() == null || 
!defaultName.consumerName().isEmpty(),
+                "consumerName should be either null or a non-empty generated 
name");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerBasicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerBasicTest.java
new file mode 100644
index 00000000000..721bebd5426
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerBasicTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Basic end-to-end coverage for {@link StreamConsumer}: ordered cumulative-ack
+ * receive flow, {@code receiveMulti}, idle receive timeout, and the consumer
+ * accessors. All scenarios use a single-segment scalable topic.
+ *
+ * <p>Multi-segment cumulative-ack-with-position-vector behavior and consumer
+ * rebalance scenarios live in dedicated scalable-topic suites.
+ */
+public class V5StreamConsumerBasicTest extends V5ClientBaseTest {
+
+    @Test
+    public void testProduceAndCumulativeAck() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("stream-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int numMessages = 50;
+        for (int i = 0; i < numMessages; i++) {
+            producer.newMessage().value("msg-" + i).send();
+        }
+
+        // Receive all and ack cumulatively at the last id only.
+        MessageId last = null;
+        for (int i = 0; i < numMessages; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected message " + i + " but receive timed 
out");
+            // Single segment: stream consumer must preserve send order.
+            assertEquals(msg.value(), "msg-" + i,
+                    "out-of-order delivery at index " + i);
+            last = msg.id();
+        }
+        assertNotNull(last);
+        consumer.acknowledgeCumulative(last);
+
+        // Nothing should redeliver after a cumulative ack of the last id.
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "stream consumer redelivered a message after cumulative ack of 
last id");
+    }
+
+    @Test
+    public void testReceiveMultiReturnsBatch() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("multi-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        int produced = 10;
+        for (int i = 0; i < produced; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+
+        // Ask for up to 20 with a short timeout. Only 10 are available, so 
receiveMulti
+        // waits until the timeout fires (or the cap is hit). Keep the timeout 
small so
+        // the test stays fast.
+        Messages<String> batch = consumer.receiveMulti(20, 
Duration.ofMillis(500));
+        assertNotNull(batch);
+        assertTrue(batch.count() >= 1 && batch.count() <= produced,
+                "unexpected batch size: " + batch.count());
+        assertNotNull(batch.lastId(), "lastId() must be set on a non-empty 
batch");
+
+        // Iterating yields exactly batch.count() messages.
+        int seen = 0;
+        for (Message<String> ignored : batch) {
+            seen++;
+        }
+        assertEquals(seen, batch.count(),
+                "iteration count must match Messages.count()");
+        consumer.acknowledgeCumulative(batch.lastId());
+    }
+
+    @Test
+    public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("idle-sub")
+                .subscribe();
+
+        Message<String> msg = consumer.receive(Duration.ofMillis(200));
+        assertNull(msg, "receive(timeout) must return null on idle topic");
+    }
+
+    @Test
+    public void testTopicSubscriptionAndConsumerNameAccessors() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("accessors-sub")
+                .consumerName("accessors-consumer")
+                .subscribe();
+
+        assertEquals(consumer.topic(), topic);
+        assertEquals(consumer.subscription(), "accessors-sub");
+        assertEquals(consumer.consumerName(), "accessors-consumer");
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 534f885663d..34413a8153f 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -122,7 +122,7 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
     @Override
     public Message<T> receive() throws PulsarClientException {
         try {
-            return messageQueue.take();
+            return advanceCheckpoint(messageQueue.take());
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarClientException("Receive interrupted", e);
@@ -132,7 +132,7 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
     @Override
     public Message<T> receive(Duration timeout) throws PulsarClientException {
         try {
-            return messageQueue.poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS);
+            return advanceCheckpoint(messageQueue.poll(timeout.toMillis(), 
TimeUnit.MILLISECONDS));
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new PulsarClientException("Receive interrupted", e);
@@ -154,16 +154,38 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
                 if (msg == null) {
                     break;
                 }
-                batch.add(msg);
+                batch.add(advanceCheckpoint(msg));
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new PulsarClientException("Receive interrupted", e);
             }
-            messageQueue.drainTo(batch, maxMessages - batch.size());
+            // Drain whatever else is immediately ready up to maxMessages.
+            List<Message<T>> drained = new ArrayList<>();
+            messageQueue.drainTo(drained, maxMessages - batch.size());
+            for (Message<T> drainedMsg : drained) {
+                batch.add(advanceCheckpoint(drainedMsg));
+            }
         }
         return new MessagesV5<>(batch);
     }
 
+    /**
+     * Update the checkpoint position for the segment this message belongs to. 
Called as
+     * messages cross the boundary from the wire-buffer to the application — 
that's the
+     * point at which a subsequent {@link #checkpoint()} should reflect "I 
have processed
+     * this message", so a {@link #seek(Checkpoint)} back to that checkpoint 
redelivers
+     * everything after it.
+     *
+     * <p>{@code msg} may be null (timeout or interrupt path); returns it 
unchanged so the
+     * caller can pass through the receive result without an extra null-check.
+     */
+    private Message<T> advanceCheckpoint(Message<T> msg) {
+        if (msg != null && msg.id() instanceof MessageIdV5 id) {
+            lastReceivedPositions.put(id.segmentId(), id.v4MessageId());
+        }
+        return msg;
+    }
+
     @Override
     public Checkpoint checkpoint() {
         Map<Long, org.apache.pulsar.client.api.MessageId> positions = new 
HashMap<>(lastReceivedPositions);
@@ -348,7 +370,12 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
 
     private void startReadLoop(Reader<T> reader, long segmentId) {
         reader.readNextAsync().thenAccept(v4Msg -> {
-            lastReceivedPositions.put(segmentId, v4Msg.getMessageId());
+            // Don't advance the checkpoint here — the read loop pre-fetches 
into the
+            // queue, so updating per-segment positions on wire-receive would 
skip
+            // messages that the application hasn't pulled yet (e.g., a 
checkpoint()
+            // taken right after the app received message N could already 
point past
+            // N+1 if the read loop got ahead). The advance happens in 
receive() /
+            // receiveMulti() instead, where the message crosses into 
application code.
             messageQueue.add(new MessageV5<>(v4Msg, segmentId));
             if (!closed) {
                 startReadLoop(reader, segmentId);
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 4a1dae8bcdf..1bbaf8905cb 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -317,9 +317,13 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
     private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>> 
createSegmentConsumerAsync(
             ActiveSegment segment) {
         PulsarClientImpl v4Client = client.v4Client();
-        var segConf = new 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+        // Clone the user-facing consumer config so per-segment consumers 
inherit every
+        // builder knob (receiverQueueSize / ackTimeout / nack backoff / DLQ / 
...) and not
+        // just the few fields we explicitly carry over.
+        var segConf = consumerConf.clone();
+        segConf.getTopicNames().clear();
+        segConf.setTopicsPattern(null);
         segConf.getTopicNames().add(segment.segmentTopicName());
-        segConf.setSubscriptionName(subscriptionName);
         segConf.setSubscriptionType(SubscriptionType.Shared);
         if (consumerConf.getConsumerName() != null) {
             segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 5137ce0462f..15b969d60c1 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -305,9 +305,12 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
     private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>> 
createSegmentConsumerAsync(
             ActiveSegment segment) {
         PulsarClientImpl v4Client = client.v4Client();
-        var segConf = new 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+        // Clone so per-segment consumers inherit every builder knob the user 
set
+        // (ackTimeout, readCompacted, replicateSubscriptionState, encryption, 
...).
+        var segConf = consumerConf.clone();
+        segConf.getTopicNames().clear();
+        segConf.setTopicsPattern(null);
         segConf.getTopicNames().add(segment.segmentTopicName());
-        segConf.setSubscriptionName(subscriptionName);
         segConf.setSubscriptionType(SubscriptionType.Exclusive);
         if (consumerConf.getConsumerName() != null) {
             segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());


Reply via email to