This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7b49b385efc [feat] PIP-468: V5 client end-to-end smoke test +
segment-bypass for internal subscribe/read (#25586)
7b49b385efc is described below
commit 7b49b385efc2ec9aec28f6193db91e017d086be2
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Apr 27 13:30:15 2026 -0700
[feat] PIP-468: V5 client end-to-end smoke test + segment-bypass for
internal subscribe/read (#25586)
---
pulsar-broker/build.gradle.kts | 2 +
.../pulsar/client/api/v5/V5ClientBaseTest.java | 88 ++++++++++++++++++++++
.../apache/pulsar/client/api/v5/V5SmokeTest.java | 60 +++++++++++++++
.../client/impl/v5/ScalableCheckpointConsumer.java | 10 +--
.../client/impl/v5/ScalableQueueConsumer.java | 12 +--
.../client/impl/v5/ScalableStreamConsumer.java | 12 +--
.../pulsar/client/impl/PulsarClientImpl.java | 64 ++++++++++++++++
7 files changed, 231 insertions(+), 17 deletions(-)
diff --git a/pulsar-broker/build.gradle.kts b/pulsar-broker/build.gradle.kts
index 11ff27ea98b..38c5b917932 100644
--- a/pulsar-broker/build.gradle.kts
+++ b/pulsar-broker/build.gradle.kts
@@ -111,6 +111,8 @@ dependencies {
testImplementation(project(path =
":pulsar-package-management:pulsar-package-core", configuration = "testJar"))
testImplementation(libs.bookkeeper.common) { artifact { classifier =
"tests" } }
testImplementation(libs.zookeeper) { artifact { classifier = "tests" } }
+ testImplementation(project(":pulsar-client-v5"))
+ testImplementation(project(":pulsar-client-api-v5"))
testImplementation(project(":pulsar-functions:pulsar-functions-local-runner-original"))
testImplementation(project(":pulsar-functions:pulsar-functions-api-examples"))
testImplementation(project(":pulsar-io:pulsar-io-batch-discovery-triggerers"))
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
new file mode 100644
index 00000000000..586d654743f
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5ClientBaseTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.pulsar.broker.service.SharedPulsarBaseTest;
+import org.testng.annotations.AfterMethod;
+
+/**
+ * Base class for V5 client end-to-end tests.
+ *
+ * <p>Extends {@link SharedPulsarBaseTest} (one shared in-memory broker per
JVM, fresh
+ * namespace per test method) and adds:
+ * <ul>
+ * <li>{@link #newV5Client()} — V5 PulsarClient against the shared
broker.</li>
+ * <li>{@link #newScalableTopic(int)} — creates a {@code topic://...}
scalable topic with the
+ * given number of initial segments and returns its name.</li>
+ * <li>Auto-tracking of clients/producers/consumers created during a test
method, closed in
+ * {@link #closeV5Resources()} after the test.</li>
+ * </ul>
+ */
+public abstract class V5ClientBaseTest extends SharedPulsarBaseTest {
+
+ private final List<AutoCloseable> v5Resources = new ArrayList<>();
+
+ /**
+ * Build a fresh V5 PulsarClient connected to the shared cluster's binary
service URL.
+ * The returned client is registered for automatic close in {@link
#closeV5Resources()}.
+ */
+ protected PulsarClient newV5Client() throws Exception {
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(getBrokerServiceUrl())
+ .build();
+ track(client);
+ return client;
+ }
+
+ /**
+ * Create a scalable topic with the given number of initial segments under
the current
+ * test namespace and return its fully-qualified {@code topic://...} name.
+ */
+ protected String newScalableTopic(int numInitialSegments) throws Exception
{
+ String name = "topic://" + getNamespace() + "/scalable-"
+ + UUID.randomUUID().toString().substring(0, 8);
+ admin.scalableTopics().createScalableTopic(name, numInitialSegments);
+ return name;
+ }
+
+ /**
+ * Register an arbitrary {@link AutoCloseable} for automatic close after
the test.
+ */
+ protected <T extends AutoCloseable> T track(T closeable) {
+ v5Resources.add(closeable);
+ return closeable;
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void closeV5Resources() {
+ // Close in reverse order: consumers/producers before the client they
belong to.
+ for (int i = v5Resources.size() - 1; i >= 0; i--) {
+ AutoCloseable c = v5Resources.get(i);
+ try {
+ c.close();
+ } catch (Exception ignored) {
+ // Best-effort cleanup; tests have already asserted what they
care about.
+ }
+ }
+ v5Resources.clear();
+ }
+}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java
new file mode 100644
index 00000000000..f613671f334
--- /dev/null
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5SmokeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.time.Duration;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.testng.annotations.Test;
+
+/**
+ * Smoke test: end-to-end verification that the longest V5 path works against
a real broker.
+ *
+ * <p>Exercises: admin {@code createScalableTopic} → {@code DagWatchClient}
session → segment
+ * lookup → per-segment v4 producer creation → wire-format send → segment v4
consumer attach →
+ * receive → ack on the V5 {@link QueueConsumer}.
+ */
+public class V5SmokeTest extends V5ClientBaseTest {
+
+ @Test
+ public void testProduceAndConsumeOneMessageOnSingleSegmentTopic() throws
Exception {
+ String topic = newScalableTopic(1);
+ PulsarClient client = newV5Client();
+
+ Producer<String> producer = client.newProducer(Schema.string())
+ .topic(topic)
+ .create();
+ track(producer);
+
+ QueueConsumer<String> consumer =
client.newQueueConsumer(Schema.string())
+ .topic(topic)
+ .subscriptionName("smoke-sub")
+ .subscribe();
+ track(consumer);
+
+ MessageId sentId =
producer.newMessage().value("hello-pulsar-v5").send();
+ assertNotNull(sentId, "producer must return a message id");
+
+ Message<String> received = consumer.receive(Duration.ofSeconds(10));
+ assertNotNull(received, "consumer must receive within timeout");
+ assertEquals(received.value(), "hello-pulsar-v5");
+ consumer.acknowledge(received.id());
+ }
+}
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
index d7b205f79e2..534f885663d 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableCheckpointConsumer.java
@@ -316,14 +316,14 @@ final class ScalableCheckpointConsumer<T> implements
CheckpointConsumer<T>, DagW
PulsarClientImpl v4Client = client.v4Client();
org.apache.pulsar.client.api.MessageId startMsgId =
resolveStartPosition(segment.segmentId());
- var builder = v4Client.newReader(v4Schema)
- .topic(segment.segmentTopicName())
- .startMessageId(startMsgId);
+ var segConf = new
org.apache.pulsar.client.impl.conf.ReaderConfigurationData<T>();
+ segConf.getTopicNames().add(segment.segmentTopicName());
+ segConf.setStartMessageId(startMsgId);
if (consumerName != null) {
- builder.readerName(consumerName + "-seg-" + segment.segmentId());
+ segConf.setReaderName(consumerName + "-seg-" +
segment.segmentId());
}
- return builder.createAsync()
+ return v4Client.createSegmentReaderAsync(segConf, v4Schema)
.thenApply(reader -> {
startReadLoop(reader, segment.segmentId());
return reader;
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
index 28b6ba8af6b..4a1dae8bcdf 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java
@@ -317,14 +317,14 @@ final class ScalableQueueConsumer<T> implements
QueueConsumer<T>, DagWatchClient
private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>>
createSegmentConsumerAsync(
ActiveSegment segment) {
PulsarClientImpl v4Client = client.v4Client();
- var builder = v4Client.newConsumer(v4Schema)
- .topic(segment.segmentTopicName())
- .subscriptionName(subscriptionName)
- .subscriptionType(SubscriptionType.Shared);
+ var segConf = new
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+ segConf.getTopicNames().add(segment.segmentTopicName());
+ segConf.setSubscriptionName(subscriptionName);
+ segConf.setSubscriptionType(SubscriptionType.Shared);
if (consumerConf.getConsumerName() != null) {
- builder.consumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());
+ segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());
}
- return builder.subscribeAsync()
+ return v4Client.subscribeSegmentAsync(segConf, v4Schema)
.thenApply(consumer -> {
startReceiveLoop(consumer, segment.segmentId());
return consumer;
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
index 2642e0f4d57..5137ce0462f 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java
@@ -305,14 +305,14 @@ final class ScalableStreamConsumer<T> implements
StreamConsumer<T>, DagWatchClie
private CompletableFuture<org.apache.pulsar.client.api.Consumer<T>>
createSegmentConsumerAsync(
ActiveSegment segment) {
PulsarClientImpl v4Client = client.v4Client();
- var builder = v4Client.newConsumer(v4Schema)
- .topic(segment.segmentTopicName())
- .subscriptionName(subscriptionName)
- .subscriptionType(SubscriptionType.Exclusive);
+ var segConf = new
org.apache.pulsar.client.impl.conf.ConsumerConfigurationData<T>();
+ segConf.getTopicNames().add(segment.segmentTopicName());
+ segConf.setSubscriptionName(subscriptionName);
+ segConf.setSubscriptionType(SubscriptionType.Exclusive);
if (consumerConf.getConsumerName() != null) {
- builder.consumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());
+ segConf.setConsumerName(consumerConf.getConsumerName() + "-seg-" +
segment.segmentId());
}
- return builder.subscribeAsync()
+ return v4Client.subscribeSegmentAsync(segConf, v4Schema)
.thenApply(consumer -> {
startReceiveLoop(consumer, segment.segmentId());
return consumer;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ca3343fd3ae..baf6198420e 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -502,6 +502,70 @@ public class PulsarClientImpl implements PulsarClient {
return createProducerAsync(topic, conf, schema, null);
}
+ /**
+ * Create a reader against a segment topic bypassing the scalable domain
check.
+ * This is intended for internal use by the V5 {@code CheckpointConsumer}
to read each
+ * segment's underlying {@code segment://} topic.
+ */
+ public <T> CompletableFuture<Reader<T>>
createSegmentReaderAsync(ReaderConfigurationData<T> conf,
+
Schema<T> schema) {
+ if (state.get() != State.Open) {
+ return FutureUtil.failedFuture(new
PulsarClientException.AlreadyClosedException("Client already closed"));
+ }
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new
PulsarClientException.InvalidConfigurationException("Reader configuration
undefined"));
+ }
+ if (conf.getTopicNames().size() != 1) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException(
+ "createSegmentReaderAsync requires exactly one
topic, got "
+ + conf.getTopicNames().size()));
+ }
+ String topic = conf.getTopicName();
+ if (!TopicName.isValid(topic)) {
+ return FutureUtil.failedFuture(
+ new
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic
+ "'"));
+ }
+ if (conf.getStartMessageId() == null) {
+ return FutureUtil.failedFuture(
+ new
PulsarClientException.InvalidConfigurationException("Invalid startMessageId"));
+ }
+ return preProcessSchemaBeforeSubscribe(this, schema, topic)
+ .thenCompose(schemaClone -> createSingleTopicReaderAsync(conf,
schemaClone));
+ }
+
+ /**
+ * Subscribe to a segment topic bypassing the scalable domain check.
+ * This is intended for internal use by the V5 client to subscribe to
per-segment v4
+ * topics for the {@code segment://} backing topics it owns.
+ */
+ public <T> CompletableFuture<Consumer<T>>
subscribeSegmentAsync(ConsumerConfigurationData<T> conf,
+ Schema<T>
schema) {
+ if (state.get() != State.Open) {
+ return FutureUtil.failedFuture(new
PulsarClientException.AlreadyClosedException("Client already closed"));
+ }
+ if (conf == null) {
+ return FutureUtil.failedFuture(
+ new
PulsarClientException.InvalidConfigurationException("Consumer configuration
undefined"));
+ }
+ if (conf.getTopicNames().size() != 1) {
+ return FutureUtil.failedFuture(
+ new PulsarClientException.InvalidConfigurationException(
+ "subscribeSegmentAsync requires exactly one topic,
got " + conf.getTopicNames().size()));
+ }
+ String topic = conf.getSingleTopic();
+ if (!TopicName.isValid(topic)) {
+ return FutureUtil.failedFuture(
+ new
PulsarClientException.InvalidTopicNameException("Invalid topic name: '" + topic
+ "'"));
+ }
+ if (isBlank(conf.getSubscriptionName())) {
+ return FutureUtil.failedFuture(
+ new
PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+ }
+ return singleTopicSubscribeAsync(conf, schema, null);
+ }
+
/**
* Reject {@code topic://} (PIP-460 scalable topics) and {@code
segment://} (the internal
* backing-topic domain used by V5 scalable topics). Users on the V4 SDK
must switch to the