This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c94e3f56c72 [feat] [test] PIP-468: V5 producer/consumer knob coverage 
+ related client fixes (#25588)
c94e3f56c72 is described below

commit c94e3f56c720126e70c5309d6fa50a26f1b13005
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Apr 28 08:51:02 2026 -0700

    [feat] [test] PIP-468: V5 producer/consumer knob coverage + related client 
fixes (#25588)
---
 .../pulsar/client/api/v5/V5AckTimeoutTest.java     |  66 +++++++
 .../client/api/v5/V5DeadLetterPolicyTest.java      |  91 +++++++++
 .../client/api/v5/V5MessageMetadataTest.java       | 212 +++++++++++++++++++++
 .../client/api/v5/V5ProducerAccessModeTest.java    | 133 +++++++++++++
 .../client/api/v5/V5ProducerBatchingTest.java      | 106 +++++++++++
 .../client/api/v5/V5ProducerCompressionTest.java   | 109 +++++++++++
 .../client/api/v5/V5ProducerSequenceIdTest.java    | 128 +++++++++++++
 .../client/api/v5/V5SchemaRoundtripTest.java       | 187 ++++++++++++++++++
 .../api/v5/V5SubscriptionInitialPositionTest.java  | 154 +++++++++++++++
 .../pulsar/client/impl/v5/ProducerBuilderV5.java   |  22 ++-
 .../client/impl/v5/ScalableTopicProducer.java      | 128 ++++++++++---
 11 files changed, 1305 insertions(+), 31 deletions(-)

diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
new file mode 100644
index 00000000000..3d5513f6270
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5AckTimeoutTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link QueueConsumerBuilder#ackTimeout(Duration)}: when a 
consumer
+ * receives a message but doesn't ack within the configured timeout, the broker
+ * redelivers it.
+ */
+public class V5AckTimeoutTest extends V5ClientBaseTest {
+
+    @Test
+    public void testUnackedMessageIsRedeliveredAfterAckTimeout() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        // Default ack timeout is disabled (or 60s); use a tight one so the 
test stays fast.
+        // Pulsar enforces a minimum of 1s on ackTimeout.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("ack-timeout-sub")
+                .ackTimeout(Duration.ofSeconds(1))
+                .subscribe();
+
+        producer.newMessage().value("once").send();
+
+        // Receive but don't ack.
+        Message<String> first = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(first);
+        assertEquals(first.value(), "once");
+
+        // The broker's ack-timeout sweeper runs at ackTimeout/2 cadence, so 
wait
+        // generously past 1s for the redelivery to fire.
+        Message<String> redelivered = consumer.receive(Duration.ofSeconds(10));
+        assertNotNull(redelivered, "ack-timeout did not trigger redelivery");
+        assertEquals(redelivered.value(), "once");
+        consumer.acknowledge(redelivered.id());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
new file mode 100644
index 00000000000..be8dfd7f808
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5DeadLetterPolicyTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.BackoffPolicy;
+import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@link 
QueueConsumerBuilder#deadLetterPolicy(DeadLetterPolicy)}: after
+ * {@code maxRedeliverCount} negative-acks, the broker forwards the message to 
the
+ * configured dead-letter topic.
+ *
+ * <p><b>Known V5 gap:</b> the DLQ topic is currently a non-scalable 
persistent topic.
+ * The V5 source consumer's underlying v4 {@code ConsumerImpl} creates the DLQ 
producer
+ * via {@code client.newProducer(...)}, which rejects {@code topic://} 
scalable topic
+ * names. Routing the DLQ producer through V5's segment-bypass path is 
required before
+ * a scalable DLQ can be used here.
+ */
+public class V5DeadLetterPolicyTest extends V5ClientBaseTest {
+
+    @Test
+    public void testMessageGoesToDlqAfterMaxRedeliveries() throws Exception {
+        String topic = newScalableTopic(1);
+        // Non-scalable DLQ topic — see class-level note about the V5 gap.
+        String dlqTopic = "persistent://" + getNamespace() + "/dlq-explicit";
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dlq-sub")
+                .negativeAckRedeliveryBackoff(BackoffPolicy.exponential(
+                        Duration.ofMillis(200), Duration.ofMillis(200)))
+                .deadLetterPolicy(new DeadLetterPolicy(2, null, dlqTopic, 
null))
+                .subscribe();
+
+        // Subscribe to the DLQ via the V4 client (DLQ is a regular persistent 
topic).
+        @Cleanup
+        org.apache.pulsar.client.api.Consumer<String> dlqConsumer = 
pulsarClient
+                .newConsumer(org.apache.pulsar.client.api.Schema.STRING)
+                .topic(dlqTopic)
+                .subscriptionName("dlq-watcher")
+                .subscriptionInitialPosition(
+                        
org.apache.pulsar.client.api.SubscriptionInitialPosition.Earliest)
+                .subscribe();
+
+        producer.newMessage().value("dead").send();
+
+        // V4 DLQ kicks in only when redeliveryCount > maxRedeliverCount 
(strictly greater).
+        // With maxRedeliverCount=2, the user sees deliveries with counts 0, 
1, 2 (three
+        // total), and the fourth delivery (count=3) is intercepted and 
forwarded to DLQ.
+        for (int i = 0; i < 3; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected delivery #" + (i + 1));
+            assertEquals(msg.value(), "dead");
+            consumer.negativeAcknowledge(msg.id());
+        }
+
+        // Now it should appear on the DLQ topic.
+        org.apache.pulsar.client.api.Message<String> dlqMsg =
+                dlqConsumer.receive(10, java.util.concurrent.TimeUnit.SECONDS);
+        assertNotNull(dlqMsg, "message did not land on the DLQ topic");
+        assertEquals(dlqMsg.getValue(), "dead");
+        dlqConsumer.acknowledge(dlqMsg);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageMetadataTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageMetadataTest.java
new file mode 100644
index 00000000000..7bcecc043c8
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MessageMetadataTest.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Map;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for the {@link MessageBuilder} metadata setters: {@code 
property}/{@code properties},
+ * {@code eventTime}, {@code deliverAfter}, {@code deliverAt}. Each scenario 
sends one message
+ * with the metadata set on the producer side and asserts the consumer sees it 
back.
+ *
+ * <p>Single-segment scalable topic — the wire format is the same regardless 
of segment count
+ * so multi-segment tests don't add coverage here.
+ */
+public class V5MessageMetadataTest extends V5ClientBaseTest {
+
+    @Test
+    public void testEventTimeIsPropagated() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("event-time-sub")
+                .subscribe();
+
+        Instant eventTime = Instant.parse("2024-01-15T08:30:00Z");
+        producer.newMessage()
+                .value("with-event-time")
+                .eventTime(eventTime)
+                .send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertTrue(msg.eventTime().isPresent(), "consumer must see the event 
time");
+        assertEquals(msg.eventTime().get(), eventTime);
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void testEventTimeAbsentWhenNotSet() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("no-event-time-sub")
+                .subscribe();
+
+        producer.newMessage().value("no-event-time").send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        // The wire carries event time as 0 when unset; the V5 API must 
surface that as
+        // an empty Optional so users can distinguish "no event time" from 
"epoch".
+        assertTrue(msg.eventTime().isEmpty(),
+                "eventTime should be absent when the producer didn't set it, 
was: " + msg.eventTime());
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void testPropertiesPropagateOneAtATime() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("props-single-sub")
+                .subscribe();
+
+        producer.newMessage()
+                .value("with-props")
+                .property("a", "1")
+                .property("b", "two")
+                .send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertEquals(msg.properties(), Map.of("a", "1", "b", "two"));
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void testPropertiesPropagateAsBatch() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("props-batch-sub")
+                .subscribe();
+
+        Map<String, String> props = Map.of("k1", "v1", "k2", "v2", "k3", "v3");
+        producer.newMessage()
+                .value("with-props-batch")
+                .properties(props)
+                .send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertEquals(msg.properties(), props);
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void testDeliverAfterDelaysVisibility() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        // Shared subscription is required for delayed delivery.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("delay-after-sub")
+                .subscribe();
+
+        long sentNanos = System.nanoTime();
+        producer.newMessage()
+                .value("delayed")
+                .deliverAfter(Duration.ofSeconds(2))
+                .send();
+
+        // A short poll right after send must not see the message yet.
+        assertNull(consumer.receive(Duration.ofMillis(500)),
+                "delayed message must not be visible before deliverAfter 
elapses");
+
+        // Wait long enough for the broker's redelivery tracker to release it.
+        Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+        long elapsedMs = (System.nanoTime() - sentNanos) / 1_000_000;
+        assertNotNull(msg, "delayed message did not arrive after delay");
+        assertEquals(msg.value(), "delayed");
+        assertTrue(elapsedMs >= 1500,
+                "deliverAfter delivered too early: " + elapsedMs + "ms");
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void testDeliverAtDelaysVisibility() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("delay-at-sub")
+                .subscribe();
+
+        Instant deliverAt = Instant.now().plusSeconds(2);
+        producer.newMessage()
+                .value("delayed-at")
+                .deliverAt(deliverAt)
+                .send();
+
+        assertNull(consumer.receive(Duration.ofMillis(500)),
+                "deliverAt must not deliver before the timestamp");
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(10));
+        assertNotNull(msg, "deliverAt did not deliver");
+        assertEquals(msg.value(), "delayed-at");
+        // Allow 500ms slack on the lower bound for clock-skew / scheduling 
jitter.
+        assertTrue(Instant.now().isAfter(deliverAt.minusMillis(500)),
+                "delivered earlier than the configured timestamp");
+        consumer.acknowledge(msg.id());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerAccessModeTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerAccessModeTest.java
new file mode 100644
index 00000000000..7424ae30fa2
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerAccessModeTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.ProducerAccessMode;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer {@link ProducerAccessMode} coverage:
+ * <ul>
+ *   <li>SHARED is the default — multiple producers can coexist.</li>
+ *   <li>EXCLUSIVE — first producer succeeds, a second EXCLUSIVE attempt fails 
fast
+ *       with {@code ProducerBusyException}.</li>
+ *   <li>WAIT_FOR_EXCLUSIVE — second producer blocks until the first closes, 
then
+ *       takes over.</li>
+ * </ul>
+ */
+public class V5ProducerAccessModeTest extends V5ClientBaseTest {
+
+    @Test
+    public void testSharedAllowsMultipleProducers() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> p1 = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .accessMode(ProducerAccessMode.SHARED)
+                .create();
+        @Cleanup
+        Producer<String> p2 = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .accessMode(ProducerAccessMode.SHARED)
+                .create();
+
+        // Both producers are usable concurrently.
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("shared-sub")
+                .subscribe();
+
+        p1.newMessage().value("from-p1").send();
+        p2.newMessage().value("from-p2").send();
+
+        Message<String> first = consumer.receive(Duration.ofSeconds(5));
+        Message<String> second = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(first);
+        assertNotNull(second);
+        consumer.acknowledge(first.id());
+        consumer.acknowledge(second.id());
+    }
+
+    @Test
+    public void testExclusiveRejectsSecondProducer() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> first = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .accessMode(ProducerAccessMode.EXCLUSIVE)
+                .create();
+
+        // Second EXCLUSIVE on the same topic must fail at create() — the V5 
builder
+        // eagerly claims every active segment up front for exclusive access 
modes.
+        try {
+            v5Client.newProducer(Schema.string())
+                    .topic(topic)
+                    .accessMode(ProducerAccessMode.EXCLUSIVE)
+                    .create();
+            fail("a second EXCLUSIVE producer should not be able to attach");
+        } catch (PulsarClientException expected) {
+            String msg = expected.getMessage() == null ? "" : 
expected.getMessage();
+            assertTrue(msg.contains("Busy") || msg.contains("Fenced")
+                            || msg.contains("ProducerFenced") || 
msg.contains("ProducerBusy"),
+                    "unexpected error message for second EXCLUSIVE producer: " 
+ msg);
+        }
+
+        // The first producer is still healthy and can send.
+        first.newMessage().value("still-here").send();
+    }
+
+    @Test
+    public void testWaitForExclusiveSucceedsAfterFirstReleases() throws 
Exception {
+        String topic = newScalableTopic(1);
+
+        Producer<String> first = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .accessMode(ProducerAccessMode.EXCLUSIVE)
+                .create();
+        first.newMessage().value("first-claim").send();
+
+        // Start a WAIT_FOR_EXCLUSIVE create() on a separate thread — it 
should block
+        // until the first producer releases. We can't observe blocking 
directly, so we
+        // assert it has not completed before close, then completes once close 
lands.
+        java.util.concurrent.CompletableFuture<Producer<String>> secondFuture =
+                v5Client.newProducer(Schema.string())
+                        .topic(topic)
+                        .accessMode(ProducerAccessMode.WAIT_FOR_EXCLUSIVE)
+                        .createAsync();
+        Thread.sleep(500);
+        org.testng.Assert.assertFalse(secondFuture.isDone(),
+                "WAIT_FOR_EXCLUSIVE create() must block while the first 
producer holds the claim");
+
+        first.close();
+
+        @Cleanup
+        Producer<String> second = secondFuture.get(10, 
java.util.concurrent.TimeUnit.SECONDS);
+        MessageId id = second.newMessage().value("after-takeover").send();
+        assertNotNull(id);
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
new file mode 100644
index 00000000000..7e816b0663e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerBatchingTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer batching coverage. Default / disabled / custom batching policies 
all have to
+ * deliver the same observable end-to-end result: every produced message lands 
at the
+ * consumer in send order with the right value.
+ *
+ * <p>Single-segment topic so message order is fully determined by the 
producer side.
+ */
+public class V5ProducerBatchingTest extends V5ClientBaseTest {
+
+    private void produceAndVerify(BatchingPolicy policy, int count, String 
subSuffix)
+            throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .batchingPolicy(policy)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("batching-" + subSuffix)
+                .subscribe();
+
+        // Use the async producer so we don't pay the round-trip per message — 
that way
+        // batching can actually batch when enabled.
+        CompletableFuture<?>[] sends = new CompletableFuture<?>[count];
+        for (int i = 0; i < count; i++) {
+            sends[i] = producer.async().newMessage().value("v-" + i).send();
+        }
+        producer.async().flush().get(5, TimeUnit.SECONDS);
+        for (CompletableFuture<?> send : sends) {
+            assertNotNull(send.get(5, TimeUnit.SECONDS));
+        }
+
+        Set<String> received = new HashSet<>();
+        for (int i = 0; i < count; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "expected message " + i + " of " + count);
+            received.add(msg.value());
+            consumer.acknowledge(msg.id());
+        }
+        assertEquals(received.size(), count, "duplicate or missing messages");
+    }
+
+    @Test
+    public void testDefaultBatchingDeliversAllMessages() throws Exception {
+        produceAndVerify(BatchingPolicy.ofDefault(), 100, "default");
+    }
+
+    @Test
+    public void testDisabledBatchingDeliversAllMessages() throws Exception {
+        produceAndVerify(BatchingPolicy.ofDisabled(), 100, "disabled");
+    }
+
+    @Test
+    public void testTightBatchingByDelay() throws Exception {
+        // Small max-publish-delay forces batches to flush quickly; both ends 
still see
+        // every message in order.
+        BatchingPolicy tight = BatchingPolicy.of(
+                Duration.ofMillis(5), 100, MemorySize.ofMegabytes(1));
+        produceAndVerify(tight, 50, "tight-delay");
+    }
+
+    @Test
+    public void testBatchingWithSmallBatchSize() throws Exception {
+        // Cap the batch at 5 messages — exercises the maxMessages branch of 
the batching
+        // packer so a 50-message stream gets cut into 10 batches.
+        BatchingPolicy small = BatchingPolicy.of(
+                Duration.ofSeconds(1), 5, MemorySize.ofMegabytes(1));
+        produceAndVerify(small, 50, "small-batch");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerCompressionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerCompressionTest.java
new file mode 100644
index 00000000000..e8521bf471e
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerCompressionTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.CompressionPolicy;
+import org.apache.pulsar.client.api.v5.config.CompressionType;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer-side compression coverage. For each {@link CompressionType} (plus 
the
+ * disabled / default cases), produce a payload that's redundant enough to 
actually
+ * compress, then read it back through a V5 consumer and confirm the 
decompression
+ * yields the original value.
+ */
+public class V5ProducerCompressionTest extends V5ClientBaseTest {
+
+    private static final String PAYLOAD = "x".repeat(4096);
+
+    private void roundtripWithPolicy(CompressionPolicy policy, String 
subSuffix) throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .compressionPolicy(policy)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("compression-" + subSuffix)
+                .subscribe();
+
+        producer.newMessage().value(PAYLOAD).send();
+
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "compressed message did not arrive within 5s");
+        assertEquals(msg.value(), PAYLOAD,
+                "decompressed value does not match what was sent");
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void testCompressionDisabled() throws Exception {
+        roundtripWithPolicy(CompressionPolicy.disabled(), "disabled");
+    }
+
+    @Test
+    public void testCompressionLz4() throws Exception {
+        roundtripWithPolicy(CompressionPolicy.of(CompressionType.LZ4), "lz4");
+    }
+
+    @Test
+    public void testCompressionZlib() throws Exception {
+        roundtripWithPolicy(CompressionPolicy.of(CompressionType.ZLIB), 
"zlib");
+    }
+
+    @Test
+    public void testCompressionZstd() throws Exception {
+        roundtripWithPolicy(CompressionPolicy.of(CompressionType.ZSTD), 
"zstd");
+    }
+
+    @Test
+    public void testCompressionSnappy() throws Exception {
+        roundtripWithPolicy(CompressionPolicy.of(CompressionType.SNAPPY), 
"snappy");
+    }
+
+    @Test
+    public void testNoCompressionPolicySetUsesDefault() throws Exception {
+        // Skip the helper — exercise the "didn't call compressionPolicy()" 
path.
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("compression-default-sub")
+                .subscribe();
+
+        producer.newMessage().value(PAYLOAD).send();
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertEquals(msg.value(), PAYLOAD);
+        consumer.acknowledge(msg.id());
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSequenceIdTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSequenceIdTest.java
new file mode 100644
index 00000000000..f2e9da177ff
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ProducerSequenceIdTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Producer sequence-id behavior: {@link Producer#lastSequenceId()},
+ * {@link ProducerBuilder#initialSequenceId(long)}, and explicit per-message
+ * {@link MessageMetadata#sequenceId(long)}.
+ *
+ * <p>Single-segment scalable topic with broker-side deduplication enabled by 
the shared
+ * cluster (see {@code 
SharedPulsarCluster.setBrokerDeduplicationEnabled(true)}), so
+ * sending the same {@code sequenceId} twice gives the producer a no-op on the 
second
+ * call.
+ */
+public class V5ProducerSequenceIdTest extends V5ClientBaseTest {
+
+    @Test
+    public void testLastSequenceIdAdvancesAsMessagesAreSent() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .producerName("seq-1")
+                .create();
+
+        // Before any send, lastSequenceId is "no message yet" (-1).
+        assertEquals(producer.lastSequenceId(), -1L,
+                "lastSequenceId before first send must be -1");
+
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("v-" + i).send();
+        }
+        // Without explicit sequenceId, the producer auto-increments from 0; 
after 5
+        // sends the last one had id 4.
+        assertEquals(producer.lastSequenceId(), 4L);
+    }
+
+    @Test
+    public void testInitialSequenceIdShiftsTheStartingPoint() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .producerName("seq-2")
+                .initialSequenceId(99)
+                .create();
+
+        // Before send: lastSequenceId reflects the configured initial.
+        assertEquals(producer.lastSequenceId(), 99L,
+                "initialSequenceId must seed lastSequenceId");
+
+        producer.newMessage().value("post-init").send();
+        assertEquals(producer.lastSequenceId(), 100L,
+                "next message after initialSequenceId=99 should have id 100");
+    }
+
+    @Test
+    public void testExplicitSequenceIdIsHonored() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .producerName("seq-3")
+                .create();
+
+        producer.newMessage().value("explicit").sequenceId(123L).send();
+        assertEquals(producer.lastSequenceId(), 123L);
+    }
+
+    @Test
+    public void testDeduplicationDropsRepeatedSequenceId() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .producerName("seq-dedup")
+                .create();
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("dedup-sub")
+                .subscribe();
+
+        // Same sequenceId twice — the broker must accept the first and dedup 
the second.
+        // Both sends complete without an exception (the second returns a 
sentinel message
+        // id; we don't pin the exact value because that's a broker-internal 
contract).
+        MessageId firstId = 
producer.newMessage().value("once").sequenceId(7L).send();
+        assertNotNull(firstId);
+        producer.newMessage().value("once-dup").sequenceId(7L).send();
+
+        // Observable behavior: consumer sees the first message exactly once.
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertEquals(msg.value(), "once");
+        consumer.acknowledge(msg.id());
+
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "duplicate sequenceId must not produce a second 
consumer-visible message");
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaRoundtripTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaRoundtripTest.java
new file mode 100644
index 00000000000..7a79ddd5d8a
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SchemaRoundtripTest.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import java.util.Objects;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.schema.proto.Test.TestEnum;
+import org.apache.pulsar.client.api.schema.proto.Test.TestMessage;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Roundtrip every {@code Schema.*} factory through the V5 producer / consumer 
wire path
+ * on a single-segment scalable topic. One test per schema; each sends a 
sentinel value
+ * and asserts the consumer reads it back unchanged.
+ *
+ * <p>{@code autoProduceBytes} requires a more elaborate setup (broker-side 
schema must
+ * already be registered for the topic) and is covered separately under 
producer knobs.
+ */
+public class V5SchemaRoundtripTest extends V5ClientBaseTest {
+
+    private <T> T roundtrip(Schema<T> schema, T value) throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<T> producer = v5Client.newProducer(schema)
+                .topic(topic)
+                .create();
+        @Cleanup
+        QueueConsumer<T> consumer = v5Client.newQueueConsumer(schema)
+                .topic(topic)
+                .subscriptionName("schema-sub")
+                .subscribe();
+
+        producer.newMessage().value(value).send();
+        Message<T> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg, "message did not arrive within 5s");
+        T received = msg.value();
+        consumer.acknowledge(msg.id());
+        return received;
+    }
+
+    @Test
+    public void testBytes() throws Exception {
+        byte[] sent = "hello-bytes".getBytes();
+        byte[] got = roundtrip(Schema.bytes(), sent);
+        assertEquals(got, sent);
+    }
+
+    @Test
+    public void testString() throws Exception {
+        assertEquals(roundtrip(Schema.string(), "hello-string"), 
"hello-string");
+    }
+
+    @Test
+    public void testBool() throws Exception {
+        assertEquals(roundtrip(Schema.bool(), Boolean.TRUE), Boolean.TRUE);
+    }
+
+    @Test
+    public void testInt8() throws Exception {
+        assertEquals(roundtrip(Schema.int8(), (byte) -7), (byte) -7);
+    }
+
+    @Test
+    public void testInt16() throws Exception {
+        assertEquals(roundtrip(Schema.int16(), (short) 12345), (short) 12345);
+    }
+
+    @Test
+    public void testInt32() throws Exception {
+        assertEquals(roundtrip(Schema.int32(), 123_456_789), 
Integer.valueOf(123_456_789));
+    }
+
+    @Test
+    public void testInt64() throws Exception {
+        long v = 1234567890123L;
+        assertEquals(roundtrip(Schema.int64(), v), Long.valueOf(v));
+    }
+
+    @Test
+    public void testFloat32() throws Exception {
+        assertEquals(roundtrip(Schema.float32(), 3.14f), 3.14f, 1e-6f);
+    }
+
+    @Test
+    public void testFloat64() throws Exception {
+        assertEquals(roundtrip(Schema.float64(), 2.718281828d), 2.718281828d, 
1e-9);
+    }
+
+    @Test
+    public void testJson() throws Exception {
+        Pojo sent = new Pojo("alice", 30);
+        Pojo got = roundtrip(Schema.json(Pojo.class), sent);
+        assertEquals(got, sent);
+    }
+
+    @Test
+    public void testAvro() throws Exception {
+        Pojo sent = new Pojo("bob", 42);
+        Pojo got = roundtrip(Schema.avro(Pojo.class), sent);
+        assertEquals(got, sent);
+    }
+
+    @Test
+    public void testProtobuf() throws Exception {
+        TestMessage sent = TestMessage.newBuilder()
+                .setStringField("proto-roundtrip")
+                .setDoubleField(3.14159)
+                .setIntField(42)
+                .setTestEnum(TestEnum.SHARED)
+                .build();
+        TestMessage got = roundtrip(Schema.protobuf(TestMessage.class), sent);
+        assertEquals(got.getStringField(), "proto-roundtrip");
+        assertEquals(got.getDoubleField(), 3.14159, 1e-9);
+        assertEquals(got.getIntField(), 42);
+        assertEquals(got.getTestEnum(), TestEnum.SHARED);
+    }
+
+    /**
+     * Tiny POJO for json/avro schema roundtrips. Public no-args + 
getters/setters
+     * keep both Jackson (json) and Avro reflection happy.
+     */
+    public static class Pojo {
+        private String name;
+        private int age;
+
+        public Pojo() {
+        }
+
+        public Pojo(String name, int age) {
+            this.name = name;
+            this.age = age;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public int getAge() {
+            return age;
+        }
+
+        public void setAge(int age) {
+            this.age = age;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof Pojo other)) {
+                return false;
+            }
+            return age == other.age && Objects.equals(name, other.name);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, age);
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SubscriptionInitialPositionTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SubscriptionInitialPositionTest.java
new file mode 100644
index 00000000000..a4a16f5015d
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SubscriptionInitialPositionTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import java.time.Duration;
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Coverage for {@code subscriptionInitialPosition}. For each consumer type 
that takes
+ * the option (Queue, Stream — the CheckpointConsumer uses {@code 
startPosition} instead,
+ * tested separately), assert that EARLIEST sees prior data and LATEST does 
not.
+ */
+public class V5SubscriptionInitialPositionTest extends V5ClientBaseTest {
+
+    @Test
+    public void testQueueConsumerEarliestSeesPriorMessages() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+
+        // Pre-existing data.
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("pre-" + i).send();
+        }
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("queue-earliest-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "EARLIEST queue consumer must see pre-existing 
pre-" + i);
+            assertEquals(msg.value(), "pre-" + i);
+            consumer.acknowledge(msg.id());
+        }
+    }
+
+    @Test
+    public void testQueueConsumerLatestSkipsPriorMessages() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("pre-" + i).send();
+        }
+
+        @Cleanup
+        QueueConsumer<String> consumer = 
v5Client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("queue-latest-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.LATEST)
+                .subscribe();
+
+        // Idle for now — pre-existing messages must be invisible.
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "LATEST queue consumer must skip pre-existing messages");
+
+        // Anything published after subscribe is delivered.
+        producer.newMessage().value("post-1").send();
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertEquals(msg.value(), "post-1");
+        consumer.acknowledge(msg.id());
+    }
+
+    @Test
+    public void testStreamConsumerEarliestSeesPriorMessages() throws Exception 
{
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("pre-" + i).send();
+        }
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("stream-earliest-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
+                .subscribe();
+
+        MessageId last = null;
+        for (int i = 0; i < 5; i++) {
+            Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+            assertNotNull(msg, "EARLIEST stream consumer must see pre-" + i);
+            assertEquals(msg.value(), "pre-" + i);
+            last = msg.id();
+        }
+        consumer.acknowledgeCumulative(last);
+    }
+
+    @Test
+    public void testStreamConsumerLatestSkipsPriorMessages() throws Exception {
+        String topic = newScalableTopic(1);
+
+        @Cleanup
+        Producer<String> producer = v5Client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        for (int i = 0; i < 5; i++) {
+            producer.newMessage().value("pre-" + i).send();
+        }
+
+        @Cleanup
+        StreamConsumer<String> consumer = 
v5Client.newStreamConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("stream-latest-sub")
+                
.subscriptionInitialPosition(SubscriptionInitialPosition.LATEST)
+                .subscribe();
+
+        assertNull(consumer.receive(Duration.ofMillis(200)),
+                "LATEST stream consumer must skip pre-existing messages");
+
+        producer.newMessage().value("post-1").send();
+        Message<String> msg = consumer.receive(Duration.ofSeconds(5));
+        assertNotNull(msg);
+        assertEquals(msg.value(), "post-1");
+        consumer.acknowledgeCumulative(msg.id());
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
index afc56c3723d..d5c261ede89 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ProducerBuilderV5.java
@@ -76,10 +76,17 @@ final class ProducerBuilderV5<T> implements 
ProducerBuilder<T> {
         DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), 
topicName);
 
         return dagWatch.start()
-                .thenApply(initialLayout -> {
+                .thenCompose(initialLayout -> {
                     ScalableTopicProducer<T> producer = new 
ScalableTopicProducer<>(
                             client, v5Schema, conf, dagWatch, initialLayout);
-                    return (Producer<T>) producer;
+                    // For exclusive access modes, claim every active segment 
up front so
+                    // a collision surfaces here instead of being deferred to 
first send.
+                    return producer.eagerAttachInitialAsync()
+                            .thenApply(__ -> (Producer<T>) producer)
+                            .exceptionallyCompose(ex -> 
producer.closeAsync().handle((__, ___) -> {
+                                throw ex instanceof 
java.util.concurrent.CompletionException ce
+                                        ? ce : new 
java.util.concurrent.CompletionException(ex);
+                            }));
                 });
     }
 
@@ -97,7 +104,16 @@ final class ProducerBuilderV5<T> implements 
ProducerBuilder<T> {
 
     @Override
     public ProducerBuilderV5<T> accessMode(ProducerAccessMode accessMode) {
-        
conf.setAccessMode(org.apache.pulsar.client.api.ProducerAccessMode.valueOf(accessMode.name()));
+        // V5 enum uses SCREAMING_SNAKE_CASE; v4 uses PascalCase, so a literal 
valueOf(name())
+        // would throw IllegalArgumentException. Map explicitly.
+        conf.setAccessMode(switch (accessMode) {
+            case SHARED -> 
org.apache.pulsar.client.api.ProducerAccessMode.Shared;
+            case EXCLUSIVE -> 
org.apache.pulsar.client.api.ProducerAccessMode.Exclusive;
+            case EXCLUSIVE_WITH_FENCING ->
+                    
org.apache.pulsar.client.api.ProducerAccessMode.ExclusiveWithFencing;
+            case WAIT_FOR_EXCLUSIVE ->
+                    
org.apache.pulsar.client.api.ProducerAccessMode.WaitForExclusive;
+        });
         return this;
     }
 
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
index 79ec430f2f6..8331b4d2fa9 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableTopicProducer.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.v5.MessageBuilder;
 import org.apache.pulsar.client.api.v5.Producer;
 import org.apache.pulsar.client.api.v5.PulsarClientException;
@@ -102,8 +103,11 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
 
     @Override
     public long lastSequenceId() {
-        // Aggregate: return the max across all segment producers
-        long max = -1;
+        // Reflect the configured initialSequenceId even before any segment 
producer has
+        // been created (segment producers are spun up lazily on first send), 
so a caller
+        // that sets initialSequenceId(N) and immediately reads 
lastSequenceId() sees N.
+        long max = producerConf.getInitialSequenceId() == null
+                ? -1L : producerConf.getInitialSequenceId();
         for (var producer : segmentProducers.values()) {
             max = Math.max(max, producer.getLastSequenceId());
         }
@@ -296,6 +300,29 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
     @Override
     public void onLayoutChange(ClientSegmentLayout newLayout, 
ClientSegmentLayout oldLayout) {
         applyLayout(newLayout);
+        // After a layout update under an exclusive access mode, we want to 
claim any
+        // newly-introduced segments eagerly so the exclusivity guarantee 
covers the
+        // whole topic, not just segments hit by the next send. Best-effort: 
this runs
+        // off the DagWatchClient callback and any failure is logged; the next 
send to
+        // that segment will surface the error via the normal 
PulsarClientException
+        // path. (The initial-create path uses {@link 
#eagerAttachInitialAsync} for
+        // strict claim.)
+        if (requiresExclusiveAttach()) {
+            CompletableFuture.runAsync(() -> {
+                for (var seg : newLayout.activeSegments()) {
+                    if (segmentProducers.containsKey(seg.segmentId())) {
+                        continue;
+                    }
+                    try {
+                        getOrCreateSegmentProducer(seg.segmentId());
+                    } catch (PulsarClientException e) {
+                        log.warn().attr("segmentId", seg.segmentId())
+                                .exceptionMessage(e)
+                                .log("Eager exclusive attach failed; will 
retry on next send");
+                    }
+                }
+            }, client.v4Client().getInternalExecutorService());
+        }
     }
 
     private void applyLayout(ClientSegmentLayout layout) {
@@ -322,11 +349,40 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             }
         }
 
-        // New segment producers will be created lazily on first message
         log.info().attr("epoch", layout.epoch())
                 .attr("activeSegments", newSegmentIds).log("Layout applied");
     }
 
+    /**
+     * Strict variant of the eager attach used at initial create time: 
surfaces any
+     * exclusivity failure as a {@link PulsarClientException} so {@code 
create()} fails
+     * up front instead of silently deferring the collision to first send.
+     */
+    CompletableFuture<Void> eagerAttachInitialAsync() {
+        if (!requiresExclusiveAttach()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return CompletableFuture.runAsync(() -> {
+            for (var seg : activeSegments) {
+                if (segmentProducers.containsKey(seg.segmentId())) {
+                    continue;
+                }
+                try {
+                    getOrCreateSegmentProducer(seg.segmentId());
+                } catch (PulsarClientException e) {
+                    throw new java.util.concurrent.CompletionException(e);
+                }
+            }
+        }, client.v4Client().getInternalExecutorService());
+    }
+
+    private boolean requiresExclusiveAttach() {
+        ProducerAccessMode mode = producerConf.getAccessMode();
+        return mode == ProducerAccessMode.Exclusive
+                || mode == ProducerAccessMode.ExclusiveWithFencing
+                || mode == ProducerAccessMode.WaitForExclusive;
+    }
+
     // --- Internal ---
 
     private long routeMessage(String key) {
@@ -345,34 +401,50 @@ final class ScalableTopicProducer<T> implements 
Producer<T>, DagWatchClient.Layo
             return existing;
         }
 
-        return segmentProducers.computeIfAbsent(segmentId, id -> {
-            // Find the segment topic name
-            String segmentTopicName = null;
-            for (var seg : activeSegments) {
-                if (seg.segmentId() == id) {
-                    segmentTopicName = seg.segmentTopicName();
-                    break;
+        try {
+            return segmentProducers.computeIfAbsent(segmentId, id -> {
+                // Find the segment topic name
+                String segmentTopicName = null;
+                for (var seg : activeSegments) {
+                    if (seg.segmentId() == id) {
+                        segmentTopicName = seg.segmentTopicName();
+                        break;
+                    }
+                }
+                if (segmentTopicName == null) {
+                    throw new RuntimeException("Segment " + id + " not found 
in active segments");
                 }
-            }
-            if (segmentTopicName == null) {
-                throw new RuntimeException("Segment " + id + " not found in 
active segments");
-            }
 
-            try {
-                PulsarClientImpl v4Client = client.v4Client();
-                var segConf = new 
org.apache.pulsar.client.impl.conf.ProducerConfigurationData();
-                segConf.setTopicName(segmentTopicName);
-                segConf.setSendTimeoutMs(producerConf.getSendTimeoutMs());
-                segConf.setBlockIfQueueFull(producerConf.isBlockIfQueueFull());
-                if (producerConf.getProducerName() != null
-                        && !producerConf.getProducerName().isEmpty()) {
-                    segConf.setProducerName(producerConf.getProducerName() + 
"-seg-" + id);
+                try {
+                    PulsarClientImpl v4Client = client.v4Client();
+                    // Clone the user-facing producer config so per-segment 
producers inherit
+                    // every builder knob (compression, batching, chunking, 
encryption,
+                    // initialSequenceId, accessMode, properties, ...) and not 
just the few
+                    // fields explicitly carried over.
+                    var segConf = producerConf.clone();
+                    segConf.setTopicName(segmentTopicName);
+                    if (producerConf.getProducerName() != null
+                            && !producerConf.getProducerName().isEmpty()) {
+                        segConf.setProducerName(producerConf.getProducerName() 
+ "-seg-" + id);
+                    }
+                    return v4Client.createSegmentProducerAsync(segConf, 
v4Schema)
+                            .get();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
-                return v4Client.createSegmentProducerAsync(segConf, v4Schema)
-                        .get();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
+            });
+        } catch (RuntimeException re) {
+            // computeIfAbsent can't throw checked exceptions; unwrap a v4 
PulsarClientException
+            // and rethrow as the V5 type so callers see the contract they 
expect (and don't
+            // get a misleading bare RuntimeException for a producer-fenced / 
busy segment).
+            Throwable cause = re.getCause();
+            while (cause instanceof java.util.concurrent.ExecutionException && 
cause.getCause() != null) {
+                cause = cause.getCause();
+            }
+            if (cause instanceof 
org.apache.pulsar.client.api.PulsarClientException v4Exc) {
+                throw new PulsarClientException(v4Exc.getMessage(), v4Exc);
             }
-        });
+            throw re;
+        }
     }
 }

Reply via email to