This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new f92a4a059be [feat] PIP-468: Basic end-to-end tests for V5
Queue/Stream/Checkpoint consumers + async APIs (#25587)
f92a4a059be is described below
commit f92a4a059bebe481f4d35a1ba7c7cb97b420a183
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 28 08:32:44 2026 -0700
[feat] PIP-468: Basic end-to-end tests for V5 Queue/Stream/Checkpoint
consumers + async APIs (#25587)
---
.../pulsar/client/api/v5/V5AsyncApisTest.java | 237 +++++++++++++++++++
.../api/v5/V5CheckpointConsumerBasicTest.java | 254 +++++++++++++++++++++
.../pulsar/client/api/v5/V5ClientBaseTest.java | 50 +++-
.../client/api/v5/V5QueueConsumerBasicTest.java | 194 ++++++++++++++++
.../client/api/v5/V5StreamConsumerBasicTest.java | 147 ++++++++++++
.../client/impl/v5/ScalableCheckpointConsumer.java | 37 ++-
.../client/impl/v5/ScalableQueueConsumer.java | 8 +-
.../client/impl/v5/ScalableStreamConsumer.java | 7 +-
8 files changed, 915 insertions(+), 19 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
new file mode 100644
index 00000000000..54164b23339
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AsyncApisTest.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.async.AsyncCheckpointConsumer;
+import org.apache.pulsar.client.api.v5.async.AsyncProducer;
+import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer;
+import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the V5 async views: {@code Producer.async()},
+ * {@code QueueConsumer.async()}, {@code StreamConsumer.async()}, and
+ * {@code CheckpointConsumer.async()}.
+ *
+ * <p>Each scenario verifies that the future returned by an async call (a)
eventually
+ * completes, (b) carries the right value (MessageId / Message / Checkpoint),
and (c)
+ * doesn't block the calling thread synchronously beyond what the wire
requires.
+ *
+ * <p>Single-segment scalable topic — multi-segment / cross-segment async
behavior lives
+ * in the dedicated scalable suites.
+ */
+public class V5AsyncApisTest extends V5ClientBaseTest {
+
+ private static final Duration AWAIT = Duration.ofSeconds(10);
+
+ @Test
+ public void testAsyncProducerSendAndFlush() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ // Establish the subscription cursor BEFORE producing so default-LATEST
+ // initial position picks up everything we send below.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("async-flush-sub")
+ .subscribe();
+
+ AsyncProducer<String> async = producer.async();
+
+ // Issue N sends without blocking on each one; collect the futures.
+ int n = 20;
+ List<CompletableFuture<MessageId>> sendFutures = new ArrayList<>();
+ for (int i = 0; i < n; i++) {
+ sendFutures.add(async.newMessage().value("msg-" + i).send());
+ }
+
+ // flush() must complete only after all in-flight sends have been
acknowledged.
+ async.flush().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+ for (int i = 0; i < n; i++) {
+ assertTrue(sendFutures.get(i).isDone(),
+ "send future " + i + " must be done after flush()");
+ assertNotNull(sendFutures.get(i).getNow(null),
+ "send future " + i + " must carry a non-null MessageId");
+ }
+
+ // Drain to confirm the messages actually landed in order.
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected msg-" + i);
+ assertEquals(msg.value(), "msg-" + i);
+ consumer.acknowledge(msg.id());
+ }
+ }
+
+ @Test
+ public void testAsyncProducerSendCarriesMessageId() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ MessageId id =
producer.async().newMessage().value("hello-async").send()
+ .get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+ assertNotNull(id, "sendAsync must complete with a non-null MessageId");
+ }
+
+ @Test
+ public void testAsyncQueueConsumerReceiveAndAck() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("async-q-sub")
+ .subscribe();
+
+ AsyncQueueConsumer<String> async = consumer.async();
+
+ producer.newMessage().value("hi").send();
+
+ Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(msg.value(), "hi");
+ // acknowledge() is fire-and-forget on the async view; verify no
message remains.
+ async.acknowledge(msg.id());
+
+ // Nothing should redeliver after a successful ack.
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "ack via async view did not stick");
+ }
+
+ @Test
+ public void testAsyncStreamConsumerReceiveAndCumulativeAck() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("async-stream-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ AsyncStreamConsumer<String> async = consumer.async();
+
+ int n = 5;
+ for (int i = 0; i < n; i++) {
+ producer.newMessage().value("m-" + i).send();
+ }
+
+ MessageId last = null;
+ for (int i = 0; i < n; i++) {
+ Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
+ assertNotNull(msg);
+ assertEquals(msg.value(), "m-" + i, "async stream consumer out of
order at " + i);
+ last = msg.id();
+ }
+ async.acknowledgeCumulative(last);
+
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "cumulative ack via async view did not stick");
+ }
+
+ @Test
+ public void testAsyncCheckpointConsumerCheckpointAndSeek() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 6; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ AsyncCheckpointConsumer<String> async = consumer.async();
+
+ // Read 3, snapshot via async, read 3 more, then async-seek back.
+ for (int i = 0; i < 3; i++) {
+ Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
+ assertEquals(msg.value(), "v-" + i);
+ }
+ Checkpoint mark = async.checkpoint().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
+ assertNotNull(mark, "async checkpoint must complete with a non-null
position");
+
+ for (int i = 3; i < 6; i++) {
+ Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
+ assertEquals(msg.value(), "v-" + i);
+ }
+
+ async.seek(mark).get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+ Set<String> redelivered = new HashSet<>();
+ for (int i = 0; i < 3; i++) {
+ Message<String> msg = async.receive().get(AWAIT.toMillis(),
TimeUnit.MILLISECONDS);
+ redelivered.add(msg.value());
+ }
+ assertEquals(redelivered, Set.of("v-3", "v-4", "v-5"),
+ "async seek did not redeliver the post-checkpoint window");
+ }
+
+ @Test
+ public void testAsyncCloseCompletes() throws Exception {
+ String topic = newScalableTopic(1);
+
+ // We don't @Cleanup these — closing them via the async API is the
test.
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("async-close-sub")
+ .subscribe();
+
+ producer.async().close().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+ consumer.async().close().get(AWAIT.toMillis(), TimeUnit.MILLISECONDS);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
new file mode 100644
index 00000000000..0cd3b78c8a4
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5CheckpointConsumerBasicTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Basic end-to-end coverage for {@link CheckpointConsumer}: the unmanaged
reader-style
+ * API used by connector frameworks (Flink, Spark) — specifically the
start-position
+ * sentinels (earliest / latest), checkpoint roundtrip via {@link
Checkpoint#toByteArray()},
+ * resume from a saved checkpoint, and {@link
CheckpointConsumer#seek(Checkpoint)}.
+ *
+ * <p>All scenarios use a single-segment scalable topic to keep the focus on
the
+ * consumer surface itself; cross-segment position-vector behavior lives in the
+ * scalable-topic suites.
+ */
+public class V5CheckpointConsumerBasicTest extends V5ClientBaseTest {
+
+ @Test
+ public void testReadFromEarliest() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("msg-" + i).send();
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected message " + i);
+ assertEquals(msg.value(), "msg-" + i, "out-of-order at index " +
i);
+ }
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "no extra messages after draining");
+ }
+
+ @Test
+ public void testReadFromLatestSkipsExistingMessages() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Pre-existing data — should not be visible to a LATEST consumer.
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("pre-" + i).send();
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.latest())
+ .create();
+
+ // Idle until we publish more.
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "LATEST consumer must not see pre-existing messages");
+
+ for (int i = 0; i < 3; i++) {
+ producer.newMessage().value("post-" + i).send();
+ }
+ for (int i = 0; i < 3; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected post-" + i);
+ assertEquals(msg.value(), "post-" + i);
+ }
+ }
+
+ @Test
+ public void testCheckpointAndResume() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 10; i++) {
+ producer.newMessage().value("msg-" + i).send();
+ }
+
+ // Read 5, take a checkpoint, close.
+ Checkpoint saved;
+ {
+ @Cleanup
+ CheckpointConsumer<String> first =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = first.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.value(), "msg-" + i);
+ }
+ saved = first.checkpoint();
+ assertNotNull(saved, "checkpoint() must return a non-null
position");
+ }
+
+ // Reopen using the saved checkpoint — should pick up at msg-5.
+ @Cleanup
+ CheckpointConsumer<String> resumed =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(saved)
+ .create();
+ for (int i = 5; i < 10; i++) {
+ Message<String> msg = resumed.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected msg-" + i);
+ assertEquals(msg.value(), "msg-" + i,
+ "resume from checkpoint delivered the wrong message");
+ }
+ assertNull(resumed.receive(Duration.ofMillis(200)),
+ "no extra messages after resuming through the tail");
+ }
+
+ @Test
+ public void testCheckpointSerializationRoundtrip() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 6; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+
+ // Read 3 then save a checkpoint as bytes, simulating external storage.
+ byte[] savedBytes;
+ {
+ @Cleanup
+ CheckpointConsumer<String> first =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+ for (int i = 0; i < 3; i++) {
+ first.receive(Duration.ofSeconds(5));
+ }
+ savedBytes = first.checkpoint().toByteArray();
+ assertNotNull(savedBytes);
+ }
+
+ // Reopen by deserializing the checkpoint bytes — must resume at index
3.
+ Checkpoint restored = Checkpoint.fromByteArray(savedBytes);
+ @Cleanup
+ CheckpointConsumer<String> resumed =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(restored)
+ .create();
+ List<String> received = new ArrayList<>();
+ for (int i = 3; i < 6; i++) {
+ Message<String> msg = resumed.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected v-" + i);
+ received.add(msg.value());
+ }
+ assertEquals(received, List.of("v-3", "v-4", "v-5"));
+ }
+
+ @Test
+ public void testSeekRewindsToEarlierCheckpoint() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 6; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ // Read 3, snapshot, read 3 more, then seek back to the snapshot —
should
+ // re-deliver the second batch.
+ for (int i = 0; i < 3; i++) {
+ assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-"
+ i);
+ }
+ Checkpoint mark = consumer.checkpoint();
+ for (int i = 3; i < 6; i++) {
+ assertEquals(consumer.receive(Duration.ofSeconds(5)).value(), "v-"
+ i);
+ }
+
+ consumer.seek(mark);
+ for (int i = 3; i < 6; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "seek did not redeliver message v-" + i);
+ assertEquals(msg.value(), "v-" + i);
+ }
+ }
+
+ @Test
+ public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception
{
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ Message<String> msg = consumer.receive(Duration.ofMillis(200));
+ assertNull(msg, "receive(timeout) must return null on idle topic");
+ }
+
+ @Test
+ public void testTopicAccessor() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ CheckpointConsumer<String> consumer =
v5Client.newCheckpointConsumer(Schema.string())
+ .topic(topic)
+ .startPosition(Checkpoint.earliest())
+ .create();
+
+ assertEquals(consumer.topic(), topic);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
index 586d654743f..f658294d49e 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
/**
* Base class for V5 client end-to-end tests.
@@ -30,27 +32,54 @@ import org.testng.annotations.AfterMethod;
* <p>Extends {@link SharedPulsarBaseTest} (one shared in-memory broker per
JVM, fresh
* namespace per test method) and adds:
* <ul>
- * <li>{@link #newV5Client()} — V5 PulsarClient against the shared
broker.</li>
- * <li>{@link #newScalableTopic(int)} — creates a {@code topic://...}
scalable topic with the
- * given number of initial segments and returns its name.</li>
- * <li>Auto-tracking of clients/producers/consumers created during a test
method, closed in
- * {@link #closeV5Resources()} after the test.</li>
+ * <li>{@link #v5Client} — a shared V5 PulsarClient initialized once per
test class
+ * (mirrors the v4 {@code pulsarClient} field on the parent). Most tests
should
+ * just use this.</li>
+ * <li>{@link #newV5Client()} — for the rare case where a test needs its own
+ * dedicated client (e.g., to exercise client-lifecycle behavior).
Tracked for
+ * cleanup automatically.</li>
+ * <li>{@link #newScalableTopic(int)} — creates a {@code topic://...}
scalable topic
+ * with the given number of initial segments and returns its name.</li>
* </ul>
+ *
+ * <p>Tests should prefer Lombok's {@code @Cleanup} on local producer /
consumer
+ * variables for resource lifecycle. {@link #track(AutoCloseable)} is
available for
+ * cases where {@code @Cleanup} doesn't fit (e.g., resources stored in fields).
*/
public abstract class V5ClientBaseTest extends SharedPulsarBaseTest {
+ /** Shared V5 client. Initialized in {@code @BeforeClass}, closed in
{@code @AfterClass}. */
+ protected PulsarClient v5Client;
+
private final List<AutoCloseable> v5Resources = new ArrayList<>();
+ @BeforeClass(alwaysRun = true)
+ public void setupSharedV5Client() throws Exception {
+ v5Client = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .build();
+ }
+
+ @AfterClass(alwaysRun = true)
+ public void closeSharedV5Client() throws Exception {
+ if (v5Client != null) {
+ v5Client.close();
+ v5Client = null;
+ }
+ }
+
/**
- * Build a fresh V5 PulsarClient connected to the shared cluster's binary
service URL.
- * The returned client is registered for automatic close in {@link
#closeV5Resources()}.
+ * Build a fresh V5 PulsarClient connected to the shared cluster. The
returned
+ * client is registered for automatic close after the test method.
+ *
+ * <p>Most tests should use the shared {@link #v5Client} instead — only
reach for this
+ * when the test specifically needs an isolated client.
*/
protected PulsarClient newV5Client() throws Exception {
PulsarClient client = PulsarClient.builder()
.serviceUrl(getBrokerServiceUrl())
.build();
- track(client);
- return client;
+ return track(client);
}
/**
@@ -66,6 +95,7 @@ public abstract class V5ClientBaseTest extends
SharedPulsarBaseTest {
/**
* Register an arbitrary {@link AutoCloseable} for automatic close after
the test.
+ * Prefer {@code @Cleanup} on local variables when possible.
*/
protected <T extends AutoCloseable> T track(T closeable) {
v5Resources.add(closeable);
@@ -74,7 +104,7 @@ public abstract class V5ClientBaseTest extends
SharedPulsarBaseTest {
@AfterMethod(alwaysRun = true)
public void closeV5Resources() {
- // Close in reverse order: consumers/producers before the client they
belong to.
+ // Close in reverse order: nested resources before the things they
hang off of.
for (int i = v5Resources.size() - 1; i >= 0; i--) {
AutoCloseable c = v5Resources.get(i);
try {
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerBasicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerBasicTest.java
new file mode 100644
index 00000000000..631b245e5ec
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5QueueConsumerBasicTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Basic end-to-end coverage for {@link QueueConsumer}: the simple CRUD-shaped
behaviors
+ * a user expects from a Shared-style scalable consumer (multi-message ack,
negative-ack
+ * redelivery, receive timeout, keyed-message metadata roundtrip, accessors).
+ *
+ * <p>All scenarios use a single-segment scalable topic to keep these tests
focused on the
+ * consumer surface itself; segment split / merge / multi-consumer-rebalance
scenarios live
+ * in the dedicated scalable-topic test suites.
+ */
+public class V5QueueConsumerBasicTest extends V5ClientBaseTest {
+
+ @Test
+ public void testProduceAndAckMany() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("basic-sub")
+ .subscribe();
+
+ int numMessages = 50;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value("msg-" + i).send();
+ }
+
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected message " + i + " but receive timed
out");
+ // Single segment + single consumer + Shared sub: messages stay in
send order.
+ assertEquals(msg.value(), "msg-" + i,
+ "out-of-order delivery at index " + i);
+ consumer.acknowledge(msg.id());
+ }
+
+ // No more messages should be left in the subscription.
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "unexpected extra message after acking all sent");
+ }
+
+ @Test
+ public void testNegativeAckCausesRedelivery() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ // Tight nack-redelivery (default is ~60s) so the test doesn't have to
wait.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("nack-sub")
+ // BackoffPolicy.fixed(...) sets multiplier=1.0 which the
underlying v4
+ // MultiplierRedeliveryBackoff rejects (requires > 1); use
exponential with
+ // initial==max so the cap pins the delay at 200ms.
+ .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+ Duration.ofMillis(200), Duration.ofMillis(200)))
+ .subscribe();
+
+ producer.newMessage().value("once").send();
+
+ Message<String> first = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(first);
+ assertEquals(first.value(), "once");
+ consumer.negativeAcknowledge(first.id());
+
+ Message<String> redelivered = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(redelivered, "negativeAcknowledge did not trigger
redelivery");
+ assertEquals(redelivered.value(), "once");
+ consumer.acknowledge(redelivered.id());
+ }
+
+ @Test
+ public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception
{
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("idle-sub")
+ .subscribe();
+
+ // No producer, no traffic. receive() with a small timeout must return
null
+ // rather than block indefinitely or throw.
+ Message<String> msg = consumer.receive(Duration.ofMillis(200));
+ assertNull(msg, "receive with timeout must return null on idle topic");
+ }
+
+ @Test
+ public void testKeyedMessagesPreserveKeyAndProperties() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("keys-sub")
+ .subscribe();
+
+ Set<String> sentKeys = new HashSet<>();
+ Map<String, Map<String, String>> sentProps = new java.util.HashMap<>();
+ for (int i = 0; i < 5; i++) {
+ String key = "key-" + i;
+ Map<String, String> props = Map.of("idx", String.valueOf(i),
"tag", "v5-test");
+ producer.newMessage()
+ .key(key)
+ .value("payload-" + i)
+ .properties(props)
+ .send();
+ sentKeys.add(key);
+ sentProps.put(key, props);
+ }
+
+ Set<String> seen = new HashSet<>();
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg);
+ assertTrue(msg.key().isPresent(), "message key must be
propagated");
+ String key = msg.key().get();
+ seen.add(key);
+ assertEquals(msg.properties(), sentProps.get(key),
+ "properties must roundtrip for key " + key);
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(seen, sentKeys, "consumer saw a different set of keys
than producer sent");
+ }
+
+ @Test
+ public void testTopicSubscriptionAndConsumerNameAccessors() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("accessors-sub")
+ .consumerName("accessors-consumer")
+ .subscribe();
+
+ assertEquals(consumer.topic(), topic);
+ assertEquals(consumer.subscription(), "accessors-sub");
+ assertEquals(consumer.consumerName(), "accessors-consumer");
+
+ // No consumerName set => the consumer should still expose a non-null
name (typically
+ // generated by the broker / client for diagnostics).
+ @Cleanup
+ QueueConsumer<String> defaultName =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("default-name-sub")
+ .subscribe();
+ assertTrue(defaultName.consumerName() == null ||
!defaultName.consumerName().isEmpty(),
+ "consumerName should be either null or a non-empty generated
name");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerBasicTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerBasicTest.java
new file mode 100644
index 00000000000..721bebd5426
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5StreamConsumerBasicTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Basic end-to-end coverage for {@link StreamConsumer}: ordered cumulative-ack
+ * receive flow, {@code receiveMulti}, idle receive timeout, and the consumer
+ * accessors. All scenarios use a single-segment scalable topic.
+ *
+ * <p>Multi-segment cumulative-ack-with-position-vector behavior and consumer
+ * rebalance scenarios live in dedicated scalable-topic suites.
+ */
+public class V5StreamConsumerBasicTest extends V5ClientBaseTest {
+
+ @Test
+ public void testProduceAndCumulativeAck() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("stream-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int numMessages = 50;
+ for (int i = 0; i < numMessages; i++) {
+ producer.newMessage().value("msg-" + i).send();
+ }
+
+ // Receive all and ack cumulatively at the last id only.
+ MessageId last = null;
+ for (int i = 0; i < numMessages; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected message " + i + " but receive timed
out");
+ // Single segment: stream consumer must preserve send order.
+ assertEquals(msg.value(), "msg-" + i,
+ "out-of-order delivery at index " + i);
+ last = msg.id();
+ }
+ assertNotNull(last);
+ consumer.acknowledgeCumulative(last);
+
+ // Nothing should redeliver after a cumulative ack of the last id.
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "stream consumer redelivered a message after cumulative ack of
last id");
+ }
+
+ @Test
+ public void testReceiveMultiReturnsBatch() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("multi-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ int produced = 10;
+ for (int i = 0; i < produced; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+
+ // Ask for up to 20 with a short timeout. Only 10 are available, so
receiveMulti
+ // waits until the timeout fires (or the cap is hit). Keep the timeout
small so
+ // the test stays fast.
+ Messages<String> batch = consumer.receiveMulti(20,
Duration.ofMillis(500));
+ assertNotNull(batch);
+ assertTrue(batch.count() >= 1 && batch.count() <= produced,
+ "unexpected batch size: " + batch.count());
+ assertNotNull(batch.lastId(), "lastId() must be set on a non-empty
batch");
+
+ // Iterating yields exactly batch.count() messages.
+ int seen = 0;
+ for (Message<String> ignored : batch) {
+ seen++;
+ }
+ assertEquals(seen, batch.count(),
+ "iteration count must match Messages.count()");
+ consumer.acknowledgeCumulative(batch.lastId());
+ }
+
+ @Test
+ public void testReceiveTimeoutReturnsNullWhenNoMessages() throws Exception
{
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("idle-sub")
+ .subscribe();
+
+ Message<String> msg = consumer.receive(Duration.ofMillis(200));
+ assertNull(msg, "receive(timeout) must return null on idle topic");
+ }
+
+ @Test
+ public void testTopicSubscriptionAndConsumerNameAccessors() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("accessors-sub")
+ .consumerName("accessors-consumer")
+ .subscribe();
+
+ assertEquals(consumer.topic(), topic);
+ assertEquals(consumer.subscription(), "accessors-sub");
+ assertEquals(consumer.consumerName(), "accessors-consumer");
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index 534f885663d..34413a8153f 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -122,7 +122,7 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
@Override
public Message<T> receive() throws PulsarClientException {
try {
- return messageQueue.take();
+ return advanceCheckpoint(messageQueue.take());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException("Receive interrupted", e);
@@ -132,7 +132,7 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
@Override
public Message<T> receive(Duration timeout) throws PulsarClientException {
try {
- return messageQueue.poll(timeout.toMillis(),
TimeUnit.MILLISECONDS);
+ return advanceCheckpoint(messageQueue.poll(timeout.toMillis(),
TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException("Receive interrupted", e);
@@ -154,16 +154,38 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
if (msg == null) {
break;
}
- batch.add(msg);
+ batch.add(advanceCheckpoint(msg));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new PulsarClientException("Receive interrupted", e);
}
- messageQueue.drainTo(batch, maxMessages - batch.size());
+ // Drain whatever else is immediately ready up to maxMessages.
+ List<Message<T>> drained = new ArrayList<>();
+ messageQueue.drainTo(drained, maxMessages - batch.size());
+ for (Message<T> drainedMsg : drained) {
+ batch.add(advanceCheckpoint(drainedMsg));
+ }
}
return new MessagesV5<>(batch);
}
+ /**
+ * Update the checkpoint position for the segment this message belongs to.
Called as
+ * messages cross the boundary from the wire-buffer to the application —
that's the
+ * point at which a subsequent {@link #checkpoint()} should reflect "I
have processed
+ * this message", so a {@link #seek(Checkpoint)} back to that checkpoint
redelivers
+ * everything after it.
+ *
+ * <p>{@code msg} may be null (timeout or interrupt path); returns it
unchanged so the
+ * caller can pass through the receive result without an extra null-check.
+ */
+ private Message<T> advanceCheckpoint(Message<T> msg) {
+ if (msg != null && msg.id() instanceof MessageIdV5 id) {
+ lastReceivedPositions.put(id.segmentId(), id.v4MessageId());
+ }
+ return msg;
+ }
+
@Override
public Checkpoint checkpoint() {
Map<Long, org.apache.pulsar.client.api.MessageId> positions = new
HashMap<>(lastReceivedPositions);
@@ -348,7 +370,12 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
private void startReadLoop(Reader<T> reader, long segmentId) {
reader.readNextAsync().thenAccept(v4Msg -> {
- lastReceivedPositions.put(segmentId, v4Msg.getMessageId());
+ // Don't advance the checkpoint here — the read loop pre-fetches
into the
+ // queue, so updating per-segment positions on wire-receive would
skip
+ // messages that the application hasn't pulled yet (e.g., a
checkpoint()
+ // taken right after the app received message N could already
point past
+ // N+1 if the read loop got ahead). The advance happens in
receive() /
+ // receiveMulti() instead, where the message crosses into
application code.
messageQueue.add(new MessageV5<>(v4Msg, segmentId));
if (!closed) {
startReadLoop(reader, segmentId);
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 4a1dae8bcdf..1bbaf8905cb 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -317,9 +317,13 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>>
createSegmentConsumerAsync(
ActiveSegment segment) {
PulsarClientImpl v4Client = client.v4Client();
- var segConf = new
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+ // Clone the user-facing consumer config so per-segment consumers
inherit every
+ // builder knob (receiverQueueSize / ackTimeout / nack backoff / DLQ /
...) and not
+ // just the few fields we explicitly carry over.
+ var segConf = consumerConf.clone();
+ segConf.getTopicNames().clear();
+ segConf.setTopicsPattern(null);
segConf.getTopicNames().add(segment.segmentTopicName());
- segConf.setSubscriptionName(subscriptionName);
segConf.setSubscriptionType(SubscriptionType.Shared);
if (consumerConf.getConsumerName() != null) {
segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 5137ce0462f..15b969d60c1 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -305,9 +305,12 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>>
createSegmentConsumerAsync(
ActiveSegment segment) {
PulsarClientImpl v4Client = client.v4Client();
- var segConf = new
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+ // Clone so per-segment consumers inherit every builder knob the user
set
+ // (ackTimeout, readCompacted, replicateSubscriptionState, encryption,
...).
+ var segConf = consumerConf.clone();
+ segConf.getTopicNames().clear();
+ segConf.setTopicsPattern(null);
segConf.getTopicNames().add(segment.segmentTopicName());
- segConf.setSubscriptionName(subscriptionName);
segConf.setSubscriptionType(SubscriptionType.Exclusive);
if (consumerConf.getConsumerName() != null) {
segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());