This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c94e3f56c72 [feat] [test] PIP-468: V5 producer/consumer knob coverage
+ related client fixes (#25588)
c94e3f56c72 is described below
commit c94e3f56c720126e70c5309d6fa50a26f1b13005
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 28 08:51:02 2026 -0700
[feat] [test] PIP-468: V5 producer/consumer knob coverage + related client
fixes (#25588)
---
.../pulsar/client/api/v5/V5AckTimeoutTest.java | 66 +++++++
.../client/api/v5/V5DeadLetterPolicyTest.java | 91 +++++++++
.../client/api/v5/V5MessageMetadataTest.java | 212 +++++++++++++++++++++
.../client/api/v5/V5ProducerAccessModeTest.java | 133 +++++++++++++
.../client/api/v5/V5ProducerBatchingTest.java | 106 +++++++++++
.../client/api/v5/V5ProducerCompressionTest.java | 109 +++++++++++
.../client/api/v5/V5ProducerSequenceIdTest.java | 128 +++++++++++++
.../client/api/v5/V5SchemaRoundtripTest.java | 187 ++++++++++++++++++
.../api/v5/V5SubscriptionInitialPositionTest.java | 154 +++++++++++++++
.../pulsar/client/impl/v5/ProducerBuilderV5.java | 22 ++-
.../client/impl/v5/ScalableTopicProducer.java | 128 ++++++++++---
11 files changed, 1305 insertions(+), 31 deletions(-)
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
new file mode 100644
index 00000000000..3d5513f6270
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#ackTimeout(Duration)}: when a
consumer
+ * receives a message but doesn't ack within the configured timeout, the broker
+ * redelivers it.
+ */
+public class V5AckTimeoutTest extends V5ClientBaseTest {
+
+ @Test
+ public void testUnackedMessageIsRedeliveredAfterAckTimeout() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ // Default ack timeout is disabled (or 60s); use a tight one so the
test stays fast.
+ // Pulsar enforces a minimum of 1s on ackTimeout.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("ack-timeout-sub")
+ .ackTimeout(Duration.ofSeconds(1))
+ .subscribe();
+
+ producer.newMessage().value("once").send();
+
+ // Receive but don't ack.
+ Message<String> first = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(first);
+ assertEquals(first.value(), "once");
+
+ // The broker's ack-timeout sweeper runs at ackTimeout/2 cadence, so
wait
+ // generously past 1s for the redelivery to fire.
+ Message<String> redelivered = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(redelivered, "ack-timeout did not trigger redelivery");
+ assertEquals(redelivered.value(), "once");
+ consumer.acknowledge(redelivered.id());
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
new file mode 100644
index 00000000000..be8dfd7f808
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link
QueueConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)}: after
+ * {@code maxRedeliverCount} negative-acks, the broker forwards the message to
the
+ * configured dead-letter topic.
+ *
+ * <p><b>Known V5 gap:</b> the DLQ topic is currently a non-scalable
persistent topic.
+ * The V5 source consumer's underlying v4 {@code ConsumerImpl} creates the DLQ
producer
+ * via {@code client.newProducer(...)}, which rejects {@code topic://}
scalable topic
+ * names. Routing the DLQ producer through V5's segment-bypass path is
required before
+ * a scalable DLQ can be used here.
+ */
+public class V5DeadLetterPolicyTest extends V5ClientBaseTest {
+
+ @Test
+ public void testMessageGoesToDlqAfterMaxRedeliveries() throws Exception {
+ String topic = newScalableTopic(1);
+ // Non-scalable DLQ topic — see class-level note about the V5 gap.
+ String dlqTopic = "persistent://" + getNamespace() + "/dlq-explicit";
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dlq-sub")
+ .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+ Duration.ofMillis(200), Duration.ofMillis(200)))
+ .deadLetterPolicy(new DeadLetterPolicy(2, null, dlqTopic,
null))
+ .subscribe();
+
+ // Subscribe to the DLQ via the V4 client (DLQ is a regular persistent
topic).
+ @Cleanup
+ org.apache.pulsar.client.api.Consumer<String> dlqConsumer =
pulsarClient
+ .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+ .topic(dlqTopic)
+ .subscriptionName("dlq-watcher")
+ .subscriptionInitialPosition(
+
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
+ .subscribe();
+
+ producer.newMessage().value("dead").send();
+
+ // V4 DLQ kicks in only when redeliveryCount > maxRedeliverCount
(strictly greater).
+ // With maxRedeliverCount=2, the user sees deliveries with counts 0,
1, 2 (three
+ // total), and the fourth delivery (count=3) is intercepted and
forwarded to DLQ.
+ for (int i = 0; i < 3; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected delivery #" + (i + 1));
+ assertEquals(msg.value(), "dead");
+ consumer.negativeAcknowledge(msg.id());
+ }
+
+ // Now it should appear on the DLQ topic.
+ org.apache.pulsar.client.api.Message<String> dlqMsg =
+ dlqConsumer.receive(10, java.util.concurrent.TimeUnit.SECONDS);
+ assertNotNull(dlqMsg, "message did not land on the DLQ topic");
+ assertEquals(dlqMsg.getValue(), "dead");
+ dlqConsumer.acknowledge(dlqMsg);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageMetadataTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageMetadataTest.java
new file mode 100644
index 00000000000..7bcecc043c8
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageMetadataTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@link MessageBuilder} metadata setters: {@code
property}/{@code properties},
+ * {@code eventTime}, {@code deliverAfter}, {@code deliverAt}. Each scenario
sends one message
+ * with the metadata set on the producer side and asserts the consumer sees it
back.
+ *
+ * <p>Single-segment scalable topic — the wire format is the same regardless
of segment count
+ * so multi-segment tests don't add coverage here.
+ */
+public class V5MessageMetadataTest extends V5ClientBaseTest {
+
+ @Test
+ public void testEventTimeIsPropagated() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("event-time-sub")
+ .subscribe();
+
+ Instant eventTime = Instant.parse("2024-01-15T08:30:00Z");
+ producer.newMessage()
+ .value("with-event-time")
+ .eventTime(eventTime)
+ .send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertTrue(msg.eventTime().isPresent(), "consumer must see the event
time");
+ assertEquals(msg.eventTime().get(), eventTime);
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void testEventTimeAbsentWhenNotSet() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("no-event-time-sub")
+ .subscribe();
+
+ producer.newMessage().value("no-event-time").send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ // The wire carries event time as 0 when unset; the V5 API must
surface that as
+ // an empty Optional so users can distinguish "no event time" from
"epoch".
+ assertTrue(msg.eventTime().isEmpty(),
+ "eventTime should be absent when the producer didn't set it,
was: " + msg.eventTime());
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void testPropertiesPropagateOneAtATime() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("props-single-sub")
+ .subscribe();
+
+ producer.newMessage()
+ .value("with-props")
+ .property("a", "1")
+ .property("b", "two")
+ .send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.properties(), Map.of("a", "1", "b", "two"));
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void testPropertiesPropagateAsBatch() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("props-batch-sub")
+ .subscribe();
+
+ Map<String, String> props = Map.of("k1", "v1", "k2", "v2", "k3", "v3");
+ producer.newMessage()
+ .value("with-props-batch")
+ .properties(props)
+ .send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.properties(), props);
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void testDeliverAfterDelaysVisibility() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ // Shared subscription is required for delayed delivery.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("delay-after-sub")
+ .subscribe();
+
+ long sentNanos = System.nanoTime();
+ producer.newMessage()
+ .value("delayed")
+ .deliverAfter(Duration.ofSeconds(2))
+ .send();
+
+ // A short poll right after send must not see the message yet.
+ assertNull(consumer.receive(Duration.ofMillis(500)),
+ "delayed message must not be visible before deliverAfter
elapses");
+
+ // Wait long enough for the broker's redelivery tracker to release it.
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ long elapsedMs = (System.nanoTime() - sentNanos) / 1_000_000;
+ assertNotNull(msg, "delayed message did not arrive after delay");
+ assertEquals(msg.value(), "delayed");
+ assertTrue(elapsedMs >= 1500,
+ "deliverAfter delivered too early: " + elapsedMs + "ms");
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void testDeliverAtDelaysVisibility() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("delay-at-sub")
+ .subscribe();
+
+ Instant deliverAt = Instant.now().plusSeconds(2);
+ producer.newMessage()
+ .value("delayed-at")
+ .deliverAt(deliverAt)
+ .send();
+
+ assertNull(consumer.receive(Duration.ofMillis(500)),
+ "deliverAt must not deliver before the timestamp");
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(msg, "deliverAt did not deliver");
+ assertEquals(msg.value(), "delayed-at");
+ // Allow 500ms slack on the lower bound for clock-skew / scheduling
jitter.
+ assertTrue(Instant.now().isAfter(deliverAt.minusMillis(500)),
+ "delivered earlier than the configured timestamp");
+ consumer.acknowledge(msg.id());
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerAccessModeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerAccessModeTest.java
new file mode 100644
index 00000000000..7424ae30fa2
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerAccessModeTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.ProducerAccessMode;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer {@link ProducerAccessMode} coverage:
+ * <ul>
+ * <li>SHARED is the default — multiple producers can coexist.</li>
+ * <li>EXCLUSIVE — first producer succeeds, a second EXCLUSIVE attempt fails
fast
+ * with {@code ProducerBusyException}.</li>
+ * <li>WAIT_FOR_EXCLUSIVE — second producer blocks until the first closes,
then
+ * takes over.</li>
+ * </ul>
+ */
+public class V5ProducerAccessModeTest extends V5ClientBaseTest {
+
+ @Test
+ public void testSharedAllowsMultipleProducers() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> p1 = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .accessMode(ProducerAccessMode.SHARED)
+ .create();
+ @Cleanup
+ Producer<String> p2 = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .accessMode(ProducerAccessMode.SHARED)
+ .create();
+
+ // Both producers are usable concurrently.
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("shared-sub")
+ .subscribe();
+
+ p1.newMessage().value("from-p1").send();
+ p2.newMessage().value("from-p2").send();
+
+ Message<String> first = consumer.receive(Duration.ofSeconds(5));
+ Message<String> second = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(first);
+ assertNotNull(second);
+ consumer.acknowledge(first.id());
+ consumer.acknowledge(second.id());
+ }
+
+ @Test
+ public void testExclusiveRejectsSecondProducer() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> first = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .accessMode(ProducerAccessMode.EXCLUSIVE)
+ .create();
+
+ // Second EXCLUSIVE on the same topic must fail at create() — the V5
builder
+ // eagerly claims every active segment up front for exclusive access
modes.
+ try {
+ v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .accessMode(ProducerAccessMode.EXCLUSIVE)
+ .create();
+ fail("a second EXCLUSIVE producer should not be able to attach");
+ } catch (PulsarClientException expected) {
+ String msg = expected.getMessage() == null ? "" :
expected.getMessage();
+ assertTrue(msg.contains("Busy") || msg.contains("Fenced")
+ || msg.contains("ProducerFenced") ||
msg.contains("ProducerBusy"),
+ "unexpected error message for second EXCLUSIVE producer: "
+ msg);
+ }
+
+ // The first producer is still healthy and can send.
+ first.newMessage().value("still-here").send();
+ }
+
+ @Test
+ public void testWaitForExclusiveSucceedsAfterFirstReleases() throws
Exception {
+ String topic = newScalableTopic(1);
+
+ Producer<String> first = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .accessMode(ProducerAccessMode.EXCLUSIVE)
+ .create();
+ first.newMessage().value("first-claim").send();
+
+ // Start a WAIT_FOR_EXCLUSIVE create() on a separate thread — it
should block
+ // until the first producer releases. We can't observe blocking
directly, so we
+ // assert it has not completed before close, then completes once close
lands.
+ java.util.concurrent.CompletableFuture<Producer<String>> secondFuture =
+ v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .accessMode(ProducerAccessMode.WAIT_FOR_EXCLUSIVE)
+ .createAsync();
+ Thread.sleep(500);
+ org.testng.Assert.assertFalse(secondFuture.isDone(),
+ "WAIT_FOR_EXCLUSIVE create() must block while the first
producer holds the claim");
+
+ first.close();
+
+ @Cleanup
+ Producer<String> second = secondFuture.get(10,
java.util.concurrent.TimeUnit.SECONDS);
+ MessageId id = second.newMessage().value("after-takeover").send();
+ assertNotNull(id);
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
new file mode 100644
index 00000000000..7e816b0663e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer batching coverage. Default / disabled / custom batching policies
all have to
+ * deliver the same observable end-to-end result: every produced message lands
at the
+ * consumer in send order with the right value.
+ *
+ * <p>Single-segment topic so message order is fully determined by the
producer side.
+ */
+public class V5ProducerBatchingTest extends V5ClientBaseTest {
+
+ private void produceAndVerify(BatchingPolicy policy, int count, String
subSuffix)
+ throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .batchingPolicy(policy)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("batching-" + subSuffix)
+ .subscribe();
+
+ // Use the async producer so we don't pay the round-trip per message —
that way
+ // batching can actually batch when enabled.
+ CompletableFuture<?>[] sends = new CompletableFuture<?>[count];
+ for (int i = 0; i < count; i++) {
+ sends[i] = producer.async().newMessage().value("v-" + i).send();
+ }
+ producer.async().flush().get(5, TimeUnit.SECONDS);
+ for (CompletableFuture<?> send : sends) {
+ assertNotNull(send.get(5, TimeUnit.SECONDS));
+ }
+
+ Set<String> received = new HashSet<>();
+ for (int i = 0; i < count; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "expected message " + i + " of " + count);
+ received.add(msg.value());
+ consumer.acknowledge(msg.id());
+ }
+ assertEquals(received.size(), count, "duplicate or missing messages");
+ }
+
+ @Test
+ public void testDefaultBatchingDeliversAllMessages() throws Exception {
+ produceAndVerify(BatchingPolicy.ofDefault(), 100, "default");
+ }
+
+ @Test
+ public void testDisabledBatchingDeliversAllMessages() throws Exception {
+ produceAndVerify(BatchingPolicy.ofDisabled(), 100, "disabled");
+ }
+
+ @Test
+ public void testTightBatchingByDelay() throws Exception {
+ // Small max-publish-delay forces batches to flush quickly; both ends
still see
+ // every message in order.
+ BatchingPolicy tight = BatchingPolicy.of(
+ Duration.ofMillis(5), 100, MemorySize.ofMegabytes(1));
+ produceAndVerify(tight, 50, "tight-delay");
+ }
+
+ @Test
+ public void testBatchingWithSmallBatchSize() throws Exception {
+ // Cap the batch at 5 messages — exercises the maxMessages branch of
the batching
+ // packer so a 50-message stream gets cut into 10 batches.
+ BatchingPolicy small = BatchingPolicy.of(
+ Duration.ofSeconds(1), 5, MemorySize.ofMegabytes(1));
+ produceAndVerify(small, 50, "small-batch");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerCompressionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerCompressionTest.java
new file mode 100644
index 00000000000..e8521bf471e
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerCompressionTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
+import org.apache.pulsar.client.api.v5.config.CompressionType;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer-side compression coverage. For each {@link CompressionType} (plus
the
+ * disabled / default cases), produce a payload that's redundant enough to
actually
+ * compress, then read it back through a V5 consumer and confirm the
decompression
+ * yields the original value.
+ */
+public class V5ProducerCompressionTest extends V5ClientBaseTest {
+
+ private static final String PAYLOAD = "x".repeat(4096);
+
+ private void roundtripWithPolicy(CompressionPolicy policy, String
subSuffix) throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .compressionPolicy(policy)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("compression-" + subSuffix)
+ .subscribe();
+
+ producer.newMessage().value(PAYLOAD).send();
+
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "compressed message did not arrive within 5s");
+ assertEquals(msg.value(), PAYLOAD,
+ "decompressed value does not match what was sent");
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void testCompressionDisabled() throws Exception {
+ roundtripWithPolicy(CompressionPolicy.disabled(), "disabled");
+ }
+
+ @Test
+ public void testCompressionLz4() throws Exception {
+ roundtripWithPolicy(CompressionPolicy.of(CompressionType.LZ4), "lz4");
+ }
+
+ @Test
+ public void testCompressionZlib() throws Exception {
+ roundtripWithPolicy(CompressionPolicy.of(CompressionType.ZLIB),
"zlib");
+ }
+
+ @Test
+ public void testCompressionZstd() throws Exception {
+ roundtripWithPolicy(CompressionPolicy.of(CompressionType.ZSTD),
"zstd");
+ }
+
+ @Test
+ public void testCompressionSnappy() throws Exception {
+ roundtripWithPolicy(CompressionPolicy.of(CompressionType.SNAPPY),
"snappy");
+ }
+
+ @Test
+ public void testNoCompressionPolicySetUsesDefault() throws Exception {
+ // Skip the helper — exercise the "didn't call compressionPolicy()"
path.
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("compression-default-sub")
+ .subscribe();
+
+ producer.newMessage().value(PAYLOAD).send();
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.value(), PAYLOAD);
+ consumer.acknowledge(msg.id());
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSequenceIdTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSequenceIdTest.java
new file mode 100644
index 00000000000..f2e9da177ff
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSequenceIdTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer sequence-id behavior: {@link Producer#lastSequenceId()},
+ * {@link ProducerBuilder#initialSequenceId(long)}, and explicit per-message
+ * {@link MessageMetadata#sequenceId(long)}.
+ *
+ * <p>Single-segment scalable topic with broker-side deduplication enabled by
the shared
+ * cluster (see {@code
SharedPulsarCluster.setBrokerDeduplicationEnabled(true)}), so
+ * sending the same {@code sequenceId} twice gives the producer a no-op on the
second
+ * call.
+ */
+public class V5ProducerSequenceIdTest extends V5ClientBaseTest {
+
+ @Test
+ public void testLastSequenceIdAdvancesAsMessagesAreSent() throws Exception
{
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .producerName("seq-1")
+ .create();
+
+ // Before any send, lastSequenceId is "no message yet" (-1).
+ assertEquals(producer.lastSequenceId(), -1L,
+ "lastSequenceId before first send must be -1");
+
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("v-" + i).send();
+ }
+ // Without explicit sequenceId, the producer auto-increments from 0;
after 5
+ // sends the last one had id 4.
+ assertEquals(producer.lastSequenceId(), 4L);
+ }
+
+ @Test
+ public void testInitialSequenceIdShiftsTheStartingPoint() throws Exception
{
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .producerName("seq-2")
+ .initialSequenceId(99)
+ .create();
+
+ // Before send: lastSequenceId reflects the configured initial.
+ assertEquals(producer.lastSequenceId(), 99L,
+ "initialSequenceId must seed lastSequenceId");
+
+ producer.newMessage().value("post-init").send();
+ assertEquals(producer.lastSequenceId(), 100L,
+ "next message after initialSequenceId=99 should have id 100");
+ }
+
+ @Test
+ public void testExplicitSequenceIdIsHonored() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .producerName("seq-3")
+ .create();
+
+ producer.newMessage().value("explicit").sequenceId(123L).send();
+ assertEquals(producer.lastSequenceId(), 123L);
+ }
+
+ @Test
+ public void testDeduplicationDropsRepeatedSequenceId() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .producerName("seq-dedup")
+ .create();
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("dedup-sub")
+ .subscribe();
+
+ // Same sequenceId twice — the broker must accept the first and dedup
the second.
+ // Both sends complete without an exception (the second returns a
sentinel message
+ // id; we don't pin the exact value because that's a broker-internal
contract).
+ MessageId firstId =
producer.newMessage().value("once").sequenceId(7L).send();
+ assertNotNull(firstId);
+ producer.newMessage().value("once-dup").sequenceId(7L).send();
+
+ // Observable behavior: consumer sees the first message exactly once.
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.value(), "once");
+ consumer.acknowledge(msg.id());
+
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "duplicate sequenceId must not produce a second
consumer-visible message");
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaRoundtripTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaRoundtripTest.java
new file mode 100644
index 00000000000..7a79ddd5d8a
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaRoundtripTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.Objects;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.schema.proto.Test.TestEnum;
+import org.apache.pulsar.client.api.schema.proto.Test.TestMessage;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip every {@code Schema.*} factory through the V5 producer / consumer
wire path
+ * on a single-segment scalable topic. One test per schema; each sends a
sentinel value
+ * and asserts the consumer reads it back unchanged.
+ *
+ * <p>{@code autoProduceBytes} requires a more elaborate setup (broker-side
schema must
+ * already be registered for the topic) and is covered separately under
producer knobs.
+ */
+public class V5SchemaRoundtripTest extends V5ClientBaseTest {
+
+ private <T> T roundtrip(Schema<T> schema, T value) throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<T> producer = v5Client.newProducer(schema)
+ .topic(topic)
+ .create();
+ @Cleanup
+ QueueConsumer<T> consumer = v5Client.newQueueConsumer(schema)
+ .topic(topic)
+ .subscriptionName("schema-sub")
+ .subscribe();
+
+ producer.newMessage().value(value).send();
+ Message<T> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "message did not arrive within 5s");
+ T received = msg.value();
+ consumer.acknowledge(msg.id());
+ return received;
+ }
+
+ @Test
+ public void testBytes() throws Exception {
+ byte[] sent = "hello-bytes".getBytes();
+ byte[] got = roundtrip(Schema.bytes(), sent);
+ assertEquals(got, sent);
+ }
+
+ @Test
+ public void testString() throws Exception {
+ assertEquals(roundtrip(Schema.string(), "hello-string"),
"hello-string");
+ }
+
+ @Test
+ public void testBool() throws Exception {
+ assertEquals(roundtrip(Schema.bool(), Boolean.TRUE), Boolean.TRUE);
+ }
+
+ @Test
+ public void testInt8() throws Exception {
+ assertEquals(roundtrip(Schema.int8(), (byte) -7), (byte) -7);
+ }
+
+ @Test
+ public void testInt16() throws Exception {
+ assertEquals(roundtrip(Schema.int16(), (short) 12345), (short) 12345);
+ }
+
+ @Test
+ public void testInt32() throws Exception {
+ assertEquals(roundtrip(Schema.int32(), 123_456_789),
Integer.valueOf(123_456_789));
+ }
+
+ @Test
+ public void testInt64() throws Exception {
+ long v = 1234567890123L;
+ assertEquals(roundtrip(Schema.int64(), v), Long.valueOf(v));
+ }
+
+ @Test
+ public void testFloat32() throws Exception {
+ assertEquals(roundtrip(Schema.float32(), 3.14f), 3.14f, 1e-6f);
+ }
+
+ @Test
+ public void testFloat64() throws Exception {
+ assertEquals(roundtrip(Schema.float64(), 2.718281828d), 2.718281828d,
1e-9);
+ }
+
+ @Test
+ public void testJson() throws Exception {
+ Pojo sent = new Pojo("alice", 30);
+ Pojo got = roundtrip(Schema.json(Pojo.class), sent);
+ assertEquals(got, sent);
+ }
+
+ @Test
+ public void testAvro() throws Exception {
+ Pojo sent = new Pojo("bob", 42);
+ Pojo got = roundtrip(Schema.avro(Pojo.class), sent);
+ assertEquals(got, sent);
+ }
+
+ @Test
+ public void testProtobuf() throws Exception {
+ TestMessage sent = TestMessage.newBuilder()
+ .setStringField("proto-roundtrip")
+ .setDoubleField(3.14159)
+ .setIntField(42)
+ .setTestEnum(TestEnum.SHARED)
+ .build();
+ TestMessage got = roundtrip(Schema.protobuf(TestMessage.class), sent);
+ assertEquals(got.getStringField(), "proto-roundtrip");
+ assertEquals(got.getDoubleField(), 3.14159, 1e-9);
+ assertEquals(got.getIntField(), 42);
+ assertEquals(got.getTestEnum(), TestEnum.SHARED);
+ }
+
+ /**
+ * Tiny POJO for json/avro schema roundtrips. Public no-args +
getters/setters
+ * keep both Jackson (json) and Avro reflection happy.
+ */
+ public static class Pojo {
+ private String name;
+ private int age;
+
+ public Pojo() {
+ }
+
+ public Pojo(String name, int age) {
+ this.name = name;
+ this.age = age;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public int getAge() {
+ return age;
+ }
+
+ public void setAge(int age) {
+ this.age = age;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof Pojo other)) {
+ return false;
+ }
+ return age == other.age && Objects.equals(name, other.name);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, age);
+ }
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SubscriptionInitialPositionTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SubscriptionInitialPositionTest.java
new file mode 100644
index 00000000000..a4a16f5015d
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SubscriptionInitialPositionTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code subscriptionInitialPosition}. For each consumer type
that takes
+ * the option (Queue, Stream — the CheckpointConsumer uses {@code
startPosition} instead,
+ * tested separately), assert that EARLIEST sees prior data and LATEST does
not.
+ */
+public class V5SubscriptionInitialPositionTest extends V5ClientBaseTest {
+
+ @Test
+ public void testQueueConsumerEarliestSeesPriorMessages() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+
+ // Pre-existing data.
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("pre-" + i).send();
+ }
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("queue-earliest-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "EARLIEST queue consumer must see pre-existing
pre-" + i);
+ assertEquals(msg.value(), "pre-" + i);
+ consumer.acknowledge(msg.id());
+ }
+ }
+
+ @Test
+ public void testQueueConsumerLatestSkipsPriorMessages() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("pre-" + i).send();
+ }
+
+ @Cleanup
+ QueueConsumer<String> consumer =
v5Client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("queue-latest-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.LATEST)
+ .subscribe();
+
+ // Idle for now — pre-existing messages must be invisible.
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "LATEST queue consumer must skip pre-existing messages");
+
+ // Anything published after subscribe is delivered.
+ producer.newMessage().value("post-1").send();
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.value(), "post-1");
+ consumer.acknowledge(msg.id());
+ }
+
+ @Test
+ public void testStreamConsumerEarliestSeesPriorMessages() throws Exception
{
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("pre-" + i).send();
+ }
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("stream-earliest-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+ .subscribe();
+
+ MessageId last = null;
+ for (int i = 0; i < 5; i++) {
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg, "EARLIEST stream consumer must see pre-" + i);
+ assertEquals(msg.value(), "pre-" + i);
+ last = msg.id();
+ }
+ consumer.acknowledgeCumulative(last);
+ }
+
+ @Test
+ public void testStreamConsumerLatestSkipsPriorMessages() throws Exception {
+ String topic = newScalableTopic(1);
+
+ @Cleanup
+ Producer<String> producer = v5Client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ for (int i = 0; i < 5; i++) {
+ producer.newMessage().value("pre-" + i).send();
+ }
+
+ @Cleanup
+ StreamConsumer<String> consumer =
v5Client.newStreamConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("stream-latest-sub")
+
.subscriptionInitialPosition(SubscriptionInitialPosition.LATEST)
+ .subscribe();
+
+ assertNull(consumer.receive(Duration.ofMillis(200)),
+ "LATEST stream consumer must skip pre-existing messages");
+
+ producer.newMessage().value("post-1").send();
+ Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+ assertNotNull(msg);
+ assertEquals(msg.value(), "post-1");
+ consumer.acknowledgeCumulative(msg.id());
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
index afc56c3723d..d5c261ede89 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
@@ -76,10 +76,17 @@ final class ProducerBuilderV5<T> implements
ProducerBuilder<T> {
DagWatchClient dagWatch = new DagWatchClient(client.v4Client(),
topicName);
return dagWatch.start()
- .thenApply(initialLayout -> {
+ .thenCompose(initialLayout -> {
ScalableTopicProducer<T> producer = new
ScalableTopicProducer<>(
client, v5Schema, conf, dagWatch, initialLayout);
- return (Producer<T>) producer;
+ // For exclusive access modes, claim every active segment
up front so
+ // a collision surfaces here instead of being deferred to
first send.
+ return producer.eagerAttachInitialAsync()
+ .thenApply(__ -> (Producer<T>) producer)
+ .exceptionallyCompose(ex ->
producer.closeAsync().handle((__, ___) -> {
+ throw ex instanceof
java.util.concurrent.CompletionException ce
+ ? ce : new
java.util.concurrent.CompletionException(ex);
+ }));
});
}
@@ -97,7 +104,16 @@ final class ProducerBuilderV5<T> implements
ProducerBuilder<T> {
@Override
public ProducerBuilderV5<T> accessMode(ProducerAccessMode accessMode) {
-
conf.setAccessMode(org.apache.pulsar.client.api.ProducerAccessMode.valueOf(accessMode.name()));
+ // V5 enum uses SCREAMING_SNAKE_CASE; v4 uses PascalCase, so a literal
valueOf(name())
+ // would throw IllegalArgumentException. Map explicitly.
+ conf.setAccessMode(switch (accessMode) {
+ case SHARED ->
org.apache.pulsar.client.api.ProducerAccessMode.Shared;
+ case EXCLUSIVE ->
org.apache.pulsar.client.api.ProducerAccessMode.Exclusive;
+ case EXCLUSIVE_WITH_FENCING ->
+
org.apache.pulsar.client.api.ProducerAccessMode.ExclusiveWithFencing;
+ case WAIT_FOR_EXCLUSIVE ->
+
org.apache.pulsar.client.api.ProducerAccessMode.WaitForExclusive;
+ });
return this;
}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 79ec430f2f6..8331b4d2fa9 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.v5.MessageBuilder;
import org.apache.pulsar.client.api.v5.Producer;
import org.apache.pulsar.client.api.v5.PulsarClientException;
@@ -102,8 +103,11 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
@Override
public long lastSequenceId() {
- // Aggregate: return the max across all segment producers
- long max = -1;
+ // Reflect the configured initialSequenceId even before any segment
producer has
+ // been created (segment producers are spun up lazily on first send),
so a caller
+ // that sets initialSequenceId(N) and immediately reads
lastSequenceId() sees N.
+ long max = producerConf.getInitialSequenceId() == null
+ ? -1L : producerConf.getInitialSequenceId();
for (var producer : segmentProducers.values()) {
max = Math.max(max, producer.getLastSequenceId());
}
@@ -296,6 +300,29 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
@Override
public void onLayoutChange(ClientSegmentLayout newLayout,
ClientSegmentLayout oldLayout) {
applyLayout(newLayout);
+ // After a layout update under an exclusive access mode, we want to
claim any
+ // newly-introduced segments eagerly so the exclusivity guarantee
covers the
+ // whole topic, not just segments hit by the next send. Best-effort:
this runs
+ // off the DagWatchClient callback and any failure is logged; the next
send to
+ // that segment will surface the error via the normal
PulsarClientException
+ // path. (The initial-create path uses {@link
#eagerAttachInitialAsync} for
+ // strict claim.)
+ if (requiresExclusiveAttach()) {
+ CompletableFuture.runAsync(() -> {
+ for (var seg : newLayout.activeSegments()) {
+ if (segmentProducers.containsKey(seg.segmentId())) {
+ continue;
+ }
+ try {
+ getOrCreateSegmentProducer(seg.segmentId());
+ } catch (PulsarClientException e) {
+ log.warn().attr("segmentId", seg.segmentId())
+ .exceptionMessage(e)
+ .log("Eager exclusive attach failed; will
retry on next send");
+ }
+ }
+ }, client.v4Client().getInternalExecutorService());
+ }
}
private void applyLayout(ClientSegmentLayout layout) {
@@ -322,11 +349,40 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
}
}
- // New segment producers will be created lazily on first message
log.info().attr("epoch", layout.epoch())
.attr("activeSegments", newSegmentIds).log("Layout applied");
}
+ /**
+ * Strict variant of the eager attach used at initial create time:
surfaces any
+ * exclusivity failure as a {@link PulsarClientException} so {@code
create()} fails
+ * up front instead of silently deferring the collision to first send.
+ */
+ CompletableFuture<Void> eagerAttachInitialAsync() {
+ if (!requiresExclusiveAttach()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return CompletableFuture.runAsync(() -> {
+ for (var seg : activeSegments) {
+ if (segmentProducers.containsKey(seg.segmentId())) {
+ continue;
+ }
+ try {
+ getOrCreateSegmentProducer(seg.segmentId());
+ } catch (PulsarClientException e) {
+ throw new java.util.concurrent.CompletionException(e);
+ }
+ }
+ }, client.v4Client().getInternalExecutorService());
+ }
+
+ private boolean requiresExclusiveAttach() {
+ ProducerAccessMode mode = producerConf.getAccessMode();
+ return mode == ProducerAccessMode.Exclusive
+ || mode == ProducerAccessMode.ExclusiveWithFencing
+ || mode == ProducerAccessMode.WaitForExclusive;
+ }
+
// --- Internal ---
private long routeMessage(String key) {
@@ -345,34 +401,50 @@ final class ScalableTopicProducer<T> implements
Producer<T>, DagWatchClient.Layo
return existing;
}
- return segmentProducers.computeIfAbsent(segmentId, id -> {
- // Find the segment topic name
- String segmentTopicName = null;
- for (var seg : activeSegments) {
- if (seg.segmentId() == id) {
- segmentTopicName = seg.segmentTopicName();
- break;
+ try {
+ return segmentProducers.computeIfAbsent(segmentId, id -> {
+ // Find the segment topic name
+ String segmentTopicName = null;
+ for (var seg : activeSegments) {
+ if (seg.segmentId() == id) {
+ segmentTopicName = seg.segmentTopicName();
+ break;
+ }
+ }
+ if (segmentTopicName == null) {
+ throw new RuntimeException("Segment " + id + " not found
in active segments");
}
- }
- if (segmentTopicName == null) {
- throw new RuntimeException("Segment " + id + " not found in
active segments");
- }
- try {
- PulsarClientImpl v4Client = client.v4Client();
- var segConf = new
org.apache.pulsar.client.impl.conf.ProducerConfigurationData();
- segConf.setTopicName(segmentTopicName);
- segConf.setSendTimeoutMs(producerConf.getSendTimeoutMs());
- segConf.setBlockIfQueueFull(producerConf.isBlockIfQueueFull());
- if (producerConf.getProducerName() != null
- && !producerConf.getProducerName().isEmpty()) {
- segConf.setProducerName(producerConf.getProducerName() +
"-seg-" + id);
+ try {
+ PulsarClientImpl v4Client = client.v4Client();
+ // Clone the user-facing producer config so per-segment
producers inherit
+ // every builder knob (compression, batching, chunking,
encryption,
+ // initialSequenceId, accessMode, properties, ...) and not
just the few
+ // fields explicitly carried over.
+ var segConf = producerConf.clone();
+ segConf.setTopicName(segmentTopicName);
+ if (producerConf.getProducerName() != null
+ && !producerConf.getProducerName().isEmpty()) {
+ segConf.setProducerName(producerConf.getProducerName()
+ "-seg-" + id);
+ }
+ return v4Client.createSegmentProducerAsync(segConf,
v4Schema)
+ .get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
- return v4Client.createSegmentProducerAsync(segConf, v4Schema)
- .get();
- } catch (Exception e) {
- throw new RuntimeException(e);
+ });
+ } catch (RuntimeException re) {
+ // computeIfAbsent can't throw checked exceptions; unwrap a v4
PulsarClientException
+ // and rethrow as the V5 type so callers see the contract they
expect (and don't
+ // get a misleading bare RuntimeException for a producer-fenced /
busy segment).
+ Throwable cause = re.getCause();
+ while (cause instanceof java.util.concurrent.ExecutionException &&
cause.getCause() != null) {
+ cause = cause.getCause();
+ }
+ if (cause instanceof
org.apache.pulsar.client.api.PulsarClientException v4Exc) {
+ throw new PulsarClientException(v4Exc.getMessage(), v4Exc);
}
- });
+ throw re;
+ }
}
}