This is an automated email from the ASF dual-hosted git repository.

merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b49b385efc [feat] PIP-468: V5 client end-to-end smoke test + 
segment-bypass for internal subscribe/read (#25586)
7b49b385efc is described below

commit 7b49b385efc2ec9aec28f6193db91e017d086be2
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 27 13:30:15 2026 -0700

    [feat] PIP-468: V5 client end-to-end smoke test + segment-bypass for 
internal subscribe/read (#25586)
---
 pulsar-broker/build.gradle.kts                     |  2 +
 .../pulsar/client/api/v5/V5ClientBaseTest.java     | 88 ++++++++++++++++++++++
 .../apache/pulsar/client/api/v5/V5SmokeTest.java   | 60 +++++++++++++++
 .../client/impl/v5/ScalableCheckpointConsumer.java | 10 +--
 .../client/impl/v5/ScalableQueueConsumer.java      | 12 +--
 .../client/impl/v5/ScalableStreamConsumer.java     | 12 +--
 .../pulsar/client/impl/PulsarClientImpl.java       | 64 ++++++++++++++++
 7 files changed, 231 insertions(+), 17 deletions(-)

diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts
index 11ff27ea98b..38c5b917932 100644
--- a/pulsar-broker/build.gradle.kts
+++ b/pulsar-broker/build.gradle.kts
@@ -111,6 +111,8 @@ dependencies {
     testImplementation(project(path = 
":pulsar-package-management:pulsar-package-core", configuration = "testJar"))
     testImplementation(libs.bookkeeper.common) { artifact { classifier = 
"tests" } }
     testImplementation(libs.zookeeper) { artifact { classifier = "tests" } }
+    testImplementation(project(":pulsar-client-v5"))
+    testImplementation(project(":pulsar-client-api-v5"))
     
testImplementation(project(":pulsar-functions:pulsar-functions-local-runner-original"))
     
testImplementation(project(":pulsar-functions:pulsar-functions-api-examples"))
     
testImplementation(project(":pulsar-io:pulsar-io-batch-discovery-triggerers"))
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
new file mode 100644
index 00000000000..586d654743f
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.testng.annotations.AfterMethod;
+
+/**
+ * Base class for V5 client end-to-end tests.
+ *
+ * <p>Extends {@link SharedPulsarBaseTest} (one shared in-memory broker per 
JVM, fresh
+ * namespace per test method) and adds:
+ * <ul>
+ *   <li>{@link #newV5Client()} — V5 PulsarClient against the shared 
broker.</li>
+ *   <li>{@link #newScalableTopic(int)} — creates a {@code topic://...} 
scalable topic with the
+ *       given number of initial segments and returns its name.</li>
+ *   <li>Auto-tracking of clients/producers/consumers created during a test 
method, closed in
+ *       {@link #closeV5Resources()} after the test.</li>
+ * </ul>
+ */
+public abstract class V5ClientBaseTest extends SharedPulsarBaseTest {
+
+    private final List<AutoCloseable> v5Resources = new ArrayList<>();
+
+    /**
+     * Build a fresh V5 PulsarClient connected to the shared cluster's binary 
service URL.
+     * The returned client is registered for automatic close in {@link 
#closeV5Resources()}.
+     */
+    protected PulsarClient newV5Client() throws Exception {
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(getBrokerServiceUrl())
+                .build();
+        track(client);
+        return client;
+    }
+
+    /**
+     * Create a scalable topic with the given number of initial segments under 
the current
+     * test namespace and return its fully-qualified {@code topic://...} name.
+     */
+    protected String newScalableTopic(int numInitialSegments) throws Exception 
{
+        String name = "topic://" + getNamespace() + "/scalable-"
+                + UUID.randomUUID().toString().substring(0, 8);
+        admin.scalableTopics().createScalableTopic(name, numInitialSegments);
+        return name;
+    }
+
+    /**
+     * Register an arbitrary {@link AutoCloseable} for automatic close after 
the test.
+     */
+    protected <T extends AutoCloseable> T track(T closeable) {
+        v5Resources.add(closeable);
+        return closeable;
+    }
+
+    @AfterMethod(alwaysRun = true)
+    public void closeV5Resources() {
+        // Close in reverse order: consumers/producers before the client they 
belong to.
+        for (int i = v5Resources.size() - 1; i >= 0; i--) {
+            AutoCloseable c = v5Resources.get(i);
+            try {
+                c.close();
+            } catch (Exception ignored) {
+                // Best-effort cleanup; tests have already asserted what they 
care about.
+            }
+        }
+        v5Resources.clear();
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java
new file mode 100644
index 00000000000..f613671f334
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Smoke test: end-to-end verification that the longest V5 path works against 
a real broker.
+ *
+ * <p>Exercises: admin {@code createScalableTopic} → {@code DagWatchClient} 
session → segment
+ * lookup → per-segment v4 producer creation → wire-format send → segment v4 
consumer attach →
+ * receive → ack on the V5 {@link QueueConsumer}.
+ */
+public class V5SmokeTest extends V5ClientBaseTest {
+
+    @Test
+    public void testProduceAndConsumeOneMessageOnSingleSegmentTopic() throws 
Exception {
+        String topic = newScalableTopic(1);
+        PulsarClient client = newV5Client();
+
+        Producer<String> producer = client.newProducer(Schema.string())
+                .topic(topic)
+                .create();
+        track(producer);
+
+        QueueConsumer<String> consumer = 
client.newQueueConsumer(Schema.string())
+                .topic(topic)
+                .subscriptionName("smoke-sub")
+                .subscribe();
+        track(consumer);
+
+        MessageId sentId = 
producer.newMessage().value("hello-pulsar-v5").send();
+        assertNotNull(sentId, "producer must return a message id");
+
+        Message<String> received = consumer.receive(Duration.ofSeconds(10));
+        assertNotNull(received, "consumer must receive within timeout");
+        assertEquals(received.value(), "hello-pulsar-v5");
+        consumer.acknowledge(received.id());
+    }
+}
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index d7b205f79e2..534f885663d 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -316,14 +316,14 @@ final class ScalableCheckpointConsumer<T> implements 
CheckpointConsumer<T>, DagW
         PulsarClientImpl v4Client = client.v4Client();
         org.apache.pulsar.client.api.MessageId startMsgId = 
resolveStartPosition(segment.segmentId());
 
-        var builder = v4Client.newReader(v4Schema)
-                .topic(segment.segmentTopicName())
-                .startMessageId(startMsgId);
+        var segConf = new 
org.apache.pulsar.client.impl.conf.ReaderConfigurationData<T>();
+        segConf.getTopicNames().add(segment.segmentTopicName());
+        segConf.setStartMessageId(startMsgId);
         if (consumerName != null) {
-            builder.readerName(consumerName + "-seg-" + segment.segmentId());
+            segConf.setReaderName(consumerName + "-seg-" + 
segment.segmentId());
         }
 
-        return builder.createAsync()
+        return v4Client.createSegmentReaderAsync(segConf, v4Schema)
                 .thenApply(reader -> {
                     startReadLoop(reader, segment.segmentId());
                     return reader;
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 28b6ba8af6b..4a1dae8bcdf 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -317,14 +317,14 @@ final class ScalableQueueConsumer<T> implements 
QueueConsumer<T>, DagWatchClient
     private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>> 
createSegmentConsumerAsync(
             ActiveSegment segment) {
         PulsarClientImpl v4Client = client.v4Client();
-        var builder = v4Client.newConsumer(v4Schema)
-                .topic(segment.segmentTopicName())
-                .subscriptionName(subscriptionName)
-                .subscriptionType(SubscriptionType.Shared);
+        var segConf = new 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+        segConf.getTopicNames().add(segment.segmentTopicName());
+        segConf.setSubscriptionName(subscriptionName);
+        segConf.setSubscriptionType(SubscriptionType.Shared);
         if (consumerConf.getConsumerName() != null) {
-            builder.consumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
+            segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
         }
-        return builder.subscribeAsync()
+        return v4Client.subscribeSegmentAsync(segConf, v4Schema)
                 .thenApply(consumer -> {
                     startReceiveLoop(consumer, segment.segmentId());
                     return consumer;
diff --git 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 2642e0f4d57..5137ce0462f 100644
--- 
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++ 
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -305,14 +305,14 @@ final class ScalableStreamConsumer<T> implements 
StreamConsumer<T>, DagWatchClie
     private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>> 
createSegmentConsumerAsync(
             ActiveSegment segment) {
         PulsarClientImpl v4Client = client.v4Client();
-        var builder = v4Client.newConsumer(v4Schema)
-                .topic(segment.segmentTopicName())
-                .subscriptionName(subscriptionName)
-                .subscriptionType(SubscriptionType.Exclusive);
+        var segConf = new 
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+        segConf.getTopicNames().add(segment.segmentTopicName());
+        segConf.setSubscriptionName(subscriptionName);
+        segConf.setSubscriptionType(SubscriptionType.Exclusive);
         if (consumerConf.getConsumerName() != null) {
-            builder.consumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
+            segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" + 
segment.segmentId());
         }
-        return builder.subscribeAsync()
+        return v4Client.subscribeSegmentAsync(segConf, v4Schema)
                 .thenApply(consumer -> {
                     startReceiveLoop(consumer, segment.segmentId());
                     return consumer;
diff --git 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ca3343fd3ae..baf6198420e 100644
--- 
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ 
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -502,6 +502,70 @@ public class PulsarClientImpl implements PulsarClient {
         return createProducerAsync(topic, conf, schema, null);
     }
 
+    /**
+     * Create a reader against a segment topic bypassing the scalable domain 
check.
+     * This is intended for internal use by the V5 {@code CheckpointConsumer} 
to read each
+     * segment's underlying {@code segment://} topic.
+     */
+    public <T> CompletableFuture<Reader<T>> 
createSegmentReaderAsync(ReaderConfigurationData<T> conf,
+                                                                      
Schema<T> schema) {
+        if (state.get() != State.Open) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
+        }
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new 
PulsarClientException.InvalidConfigurationException("Reader configuration 
undefined"));
+        }
+        if (conf.getTopicNames().size() != 1) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException(
+                            "createSegmentReaderAsync requires exactly one 
topic, got "
+                                    + conf.getTopicNames().size()));
+        }
+        String topic = conf.getTopicName();
+        if (!TopicName.isValid(topic)) {
+            return FutureUtil.failedFuture(
+                    new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic 
+ "'"));
+        }
+        if (conf.getStartMessageId() == null) {
+            return FutureUtil.failedFuture(
+                    new 
PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
+        }
+        return preProcessSchemaBeforeSubscribe(this, schema, topic)
+                .thenCompose(schemaClone -> createSingleTopicReaderAsync(conf, 
schemaClone));
+    }
+
+    /**
+     * Subscribe to a segment topic bypassing the scalable domain check.
+     * This is intended for internal use by the V5 client to subscribe to 
per-segment v4
+     * topics for the {@code segment://} backing topics it owns.
+     */
+    public <T> CompletableFuture<Consumer<T>> 
subscribeSegmentAsync(ConsumerConfigurationData<T> conf,
+                                                                     Schema<T> 
schema) {
+        if (state.get() != State.Open) {
+            return FutureUtil.failedFuture(new 
PulsarClientException.AlreadyClosedException("Client already closed"));
+        }
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                    new 
PulsarClientException.InvalidConfigurationException("Consumer configuration 
undefined"));
+        }
+        if (conf.getTopicNames().size() != 1) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.InvalidConfigurationException(
+                            "subscribeSegmentAsync requires exactly one topic, 
got " + conf.getTopicNames().size()));
+        }
+        String topic = conf.getSingleTopic();
+        if (!TopicName.isValid(topic)) {
+            return FutureUtil.failedFuture(
+                    new 
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic 
+ "'"));
+        }
+        if (isBlank(conf.getSubscriptionName())) {
+            return FutureUtil.failedFuture(
+                    new 
PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+        }
+        return singleTopicSubscribeAsync(conf, schema, null);
+    }
+
     /**
      * Reject {@code topic://} (PIP-460 scalable topics) and {@code 
segment://} (the internal
      * backing-topic domain used by V5 scalable topics). Users on the V4 SDK 
must switch to the

Reply via email to