This is an automated email from the ASF dual-hosted git repository.
merlimat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 820300f93be [improve][cli] Migrate pulsar-client to the V5 client API
(#25917)
820300f93be is described below
commit 820300f93bee6c92c0f3bc6cd169a47b02df3768
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Jun 3 15:09:35 2026 -0700
[improve][cli] Migrate pulsar-client to the V5 client API (#25917)
---
.../api/v5/internal/PulsarClientProvider.java | 4 +
.../apache/pulsar/client/api/v5/schema/Schema.java | 26 +++
.../pulsar/client/api/v5/schema/SchemaInfo.java | 18 ++
.../client/api/v5/schema/SchemaInfoRecord.java | 21 +-
pulsar-client-tools-test/build.gradle.kts | 1 +
.../client/cli/PulsarClientToolForceBatchNum.java | 212 ++++++++++++++++-----
.../pulsar/client/cli/PulsarClientToolTest.java | 37 ++--
.../pulsar/client/cli/PulsarClientToolWsTest.java | 14 +-
pulsar-client-tools/build.gradle.kts | 2 +
.../org/apache/pulsar/client/cli/AbstractCmd.java | 26 +++
.../pulsar/client/cli/AbstractCmdConsume.java | 171 ++++-------------
.../org/apache/pulsar/client/cli/CmdConsume.java | 177 ++++++++++-------
.../org/apache/pulsar/client/cli/CmdProduce.java | 204 +++++++-------------
.../java/org/apache/pulsar/client/cli/CmdRead.java | 135 ++++++-------
.../apache/pulsar/client/cli/PulsarClientTool.java | 106 +++++++++--
.../apache/pulsar/client/cli/TestCmdConsume.java | 23 +++
.../apache/pulsar/client/cli/TestCmdProduce.java | 78 ++++----
.../org/apache/pulsar/client/cli/TestCmdRead.java | 23 +--
pulsar-client-v5/build.gradle.kts | 6 +-
.../client/impl/v5/PulsarClientProviderV5.java | 12 ++
.../pulsar/client/impl/v5/SchemaAdapter.java | 13 ++
.../pulsar/client/impl/v5/SchemaFactoryTest.java | 68 +++++++
22 files changed, 802 insertions(+), 575 deletions(-)
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
index e27a02302bc..670e3bc374a 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/internal/PulsarClientProvider.java
@@ -78,6 +78,10 @@ public interface PulsarClientProvider {
Schema<byte[]> autoProduceBytesSchema();
+ Schema<?> genericSchema(org.apache.pulsar.client.api.v5.schema.SchemaInfo
schemaInfo);
+
+ Schema<byte[]> autoProduceBytesSchema(Schema<?> base);
+
// --- Checkpoint ---
Checkpoint checkpointFromBytes(byte[] data) throws IOException;
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
index da087a95b30..173cf0a77ff 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/Schema.java
@@ -204,4 +204,30 @@ public interface Schema<T> {
static Schema<byte[]> autoProduceBytes() {
return PulsarClientProvider.get().autoProduceBytesSchema();
}
+
+ /**
+ * Build a generic schema from a raw {@link SchemaInfo} definition. Use
this when the schema
+ * is described by a definition document (e.g. an Avro or JSON schema
string) rather than a
+ * compiled POJO class.
+ *
+ * @param schemaInfo the schema descriptor (type + definition bytes)
+ * @return a generic {@link Schema} for the given definition
+ * @see SchemaInfo#of
+ */
+ static Schema<?> generic(SchemaInfo schemaInfo) {
+ return PulsarClientProvider.get().genericSchema(schemaInfo);
+ }
+
+ /**
+ * Get a schema that produces raw bytes while validating them against the
supplied
+ * {@code base} schema (in addition to the topic schema). This is the
wrapping form of
+ * {@link #autoProduceBytes()} — the producer sends already-encoded bytes,
and the bytes are
+ * checked for compatibility with {@code base} before being published.
+ *
+ * @param base the schema the produced bytes must conform to
+ * @return a {@link Schema} for producing pre-encoded bytes validated
against {@code base}
+ */
+ static Schema<byte[]> autoProduceBytesOf(Schema<?> base) {
+ return PulsarClientProvider.get().autoProduceBytesSchema(base);
+ }
}
diff --git
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
index 26b875f91b8..910d30f7eae 100644
---
a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfo.java
@@ -52,4 +52,22 @@ public interface SchemaInfo {
* @return an unmodifiable map of schema property key-value pairs
*/
Map<String, String> properties();
+
+ /**
+ * Build a {@link SchemaInfo} from its components. Useful for constructing
a generic schema
+ * from a raw definition (e.g. an Avro/JSON schema document) via {@link
Schema#generic}.
+ *
+ * @param name the schema name
+ * @param type the schema type
+ * @param schema the raw schema definition bytes (e.g. Avro schema
JSON); may be {@code null}
+ * @param properties additional schema properties; may be {@code null}
+ * @return an immutable {@link SchemaInfo}
+ */
+ static SchemaInfo of(String name, SchemaType type, byte[] schema,
Map<String, String> properties) {
+ return new SchemaInfoRecord(
+ name,
+ type,
+ schema == null ? new byte[0] : schema.clone(),
+ properties == null ? Map.of() : Map.copyOf(properties));
+ }
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfoRecord.java
similarity index 72%
copy from
pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
copy to
pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfoRecord.java
index 10b68648ebb..787540b20c3 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
+++
b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/schema/SchemaInfoRecord.java
@@ -16,16 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.cli;
+package org.apache.pulsar.client.api.v5.schema;
-import java.util.concurrent.Callable;
+import java.util.Map;
-public abstract class AbstractCmd implements Callable<Integer> {
- // Picocli entrypoint.
- @Override
- public Integer call() throws Exception {
- return run();
- }
-
- abstract int run() throws Exception;
+/**
+ * Immutable {@link SchemaInfo} value holder backing {@link SchemaInfo#of}.
+ */
+record SchemaInfoRecord(
+ String name,
+ SchemaType type,
+ byte[] schema,
+ Map<String, String> properties
+) implements SchemaInfo {
}
diff --git a/pulsar-client-tools-test/build.gradle.kts
b/pulsar-client-tools-test/build.gradle.kts
index ce9e644b41d..87365636aef 100644
--- a/pulsar-client-tools-test/build.gradle.kts
+++ b/pulsar-client-tools-test/build.gradle.kts
@@ -35,6 +35,7 @@ dependencies {
testImplementation(libs.guava)
testImplementation(project(":pulsar-client-admin-original"))
testImplementation(project(":pulsar-client-original"))
+ testImplementation(project(":pulsar-client-api-v5"))
testImplementation(project(":pulsar-functions:pulsar-functions-api"))
testImplementation(libs.picocli)
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
index c3de66abc5c..5900b4cab8e 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolForceBatchNum.java
@@ -22,32 +22,43 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.impl.PulsarClientImpl;
-import org.mockito.stubbing.Answer;
+import org.apache.pulsar.client.api.v5.MessageBuilder;
+import org.apache.pulsar.client.api.v5.MessageId;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.ProducerBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.Transaction;
+import org.apache.pulsar.client.api.v5.async.AsyncMessageBuilder;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.testng.Assert;
/**
- * An implement of {@link PulsarClientTool} for test, which will publish
messages iff there is enough messages
- * in the batch.
+ * An implementation of {@link PulsarClientTool} for test, which publishes
messages only once there
+ * are enough messages in a batch. The producer is forced to batch exactly
{@code batchNum} messages
+ * with an effectively-infinite publish delay, and the synchronous {@code
send()} is turned into an
+ * asynchronous send so the messages actually accumulate into a batch (a
blocking send would flush
+ * each message individually).
*/
-public class PulsarClientToolForceBatchNum extends PulsarClientTool{
+public class PulsarClientToolForceBatchNum extends PulsarClientTool {
private final String topic;
private final int batchNum;
/**
- *
* @param properties properties
* @param topic topic
- * @param batchNum iff there is batchNum messages in the batch, the
producer will flush and send.
+ * @param batchNum iff there are batchNum messages in the batch, the
producer will flush and send.
*/
public PulsarClientToolForceBatchNum(Properties properties, String topic,
int batchNum) {
super(properties);
@@ -55,46 +66,147 @@ public class PulsarClientToolForceBatchNum extends
PulsarClientTool{
this.batchNum = batchNum;
produceCommand = new CmdProduce() {
@Override
- public void updateConfig(ClientBuilder newBuilder, Authentication
authentication, String serviceURL) {
- try {
- super.updateConfig(mockClientBuilder(newBuilder),
authentication, serviceURL);
- } catch (Exception e) {
- Assert.fail("update config fail " + e.getMessage());
- }
+ public void updateConfig(PulsarClientBuilder newBuilder,
Authentication authentication,
+ String serviceURL) {
+ super.updateConfig(mockClientBuilder(newBuilder),
authentication, serviceURL);
}
};
replaceProducerCommand(produceCommand);
}
- @SuppressWarnings({"unchecked", "rawtypes"})
- private ClientBuilder mockClientBuilder(ClientBuilder newBuilder) throws
Exception {
- PulsarClientImpl client = (PulsarClientImpl) newBuilder.build();
- ProducerBuilder<byte[]> producerBuilder = client.newProducer()
- .batchingMaxBytes(Integer.MAX_VALUE)
- .batchingMaxMessages(batchNum)
- .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
- .topic(topic);
- Producer<byte[]> producer = producerBuilder.create();
-
- PulsarClientImpl mockClient = spy(client);
- ProducerBuilder<byte[]> mockProducerBuilder = spy(producerBuilder);
- Producer<byte[]> mockProducer = spy(producer);
- ClientBuilder mockClientBuilder = spy(newBuilder);
-
- doAnswer((Answer<TypedMessageBuilder>) invocation -> {
- TypedMessageBuilder typedMessageBuilder =
spy((TypedMessageBuilder) invocation.callRealMethod());
- doAnswer((Answer<MessageId>) invocation1 -> {
- TypedMessageBuilder mock = ((TypedMessageBuilder)
invocation1.getMock());
- // using sendAsync() to replace send()
- mock.sendAsync();
- return null;
- }).when(typedMessageBuilder).send();
- return typedMessageBuilder;
- }).when(mockProducer).newMessage();
-
- doReturn(mockProducer).when(mockProducerBuilder).create();
-
doReturn(mockProducerBuilder).when(mockClient).newProducer(any(Schema.class));
- doReturn(mockClient).when(mockClientBuilder).build();
- return mockClientBuilder;
+ private PulsarClientBuilder mockClientBuilder(PulsarClientBuilder
newBuilder) {
+ try {
+ PulsarClient realClient = newBuilder.build();
+ PulsarClient spyClient = spy(realClient);
+
+ doAnswer(invocation -> {
+ @SuppressWarnings("unchecked")
+ Schema<byte[]> schema = (Schema<byte[]>)
invocation.getArgument(0);
+ // Build a producer that batches exactly batchNum messages and
(practically) never
+ // flushes on the timer, so batching is deterministic.
CmdProduce will still call
+ // topic()/batchingPolicy() on the returned spy; for the
batched case it leaves the
+ // default batching alone, so this forced policy stands.
+ ProducerBuilder<byte[]> realBuilder =
realClient.newProducer(schema)
+ .topic(topic)
+ .batchingPolicy(BatchingPolicy.builder()
+ .enabled(true)
+ .maxMessages(batchNum)
+ .maxPublishDelay(Duration.ofDays(1))
+ .maxSize(MemorySize.ofBytes(Integer.MAX_VALUE))
+ .build());
+ ProducerBuilder<byte[]> spyBuilder = spy(realBuilder);
+ doAnswer(c ->
forceAsyncSend(realBuilder.create())).when(spyBuilder).create();
+ return spyBuilder;
+ }).when(spyClient).newProducer(any(Schema.class));
+
+ PulsarClientBuilder spyBuilder = spy(newBuilder);
+ doReturn(spyClient).when(spyBuilder).build();
+ return spyBuilder;
+ } catch (Exception e) {
+ Assert.fail("update config fail " + e.getMessage());
+ return newBuilder;
+ }
+ }
+
+ /**
+ * Wrap a producer so that the synchronous {@code newMessage().send()} the
CLI uses is dispatched
+ * asynchronously, letting messages accumulate into a batch instead of
flushing one-by-one. The
+ * send futures are collected and awaited on {@code close()} so no batched
message is lost (the
+ * CLI ignores the per-send result, and the V5 producer does not
implicitly await fire-and-forget
+ * sends issued through a different builder instance).
+ */
+ private static Producer<byte[]> forceAsyncSend(Producer<byte[]>
realProducer) throws Exception {
+ List<CompletableFuture<MessageId>> pending =
Collections.synchronizedList(new ArrayList<>());
+ Producer<byte[]> spyProducer = spy(realProducer);
+ doAnswer(inv -> new
AsyncForwardingMessageBuilder(realProducer.async().newMessage(), pending))
+ .when(spyProducer).newMessage();
+ doAnswer(inv -> {
+ CompletableFuture.allOf(pending.toArray(new
CompletableFuture[0])).join();
+ realProducer.close();
+ return null;
+ }).when(spyProducer).close();
+ return spyProducer;
+ }
+
+ /**
+ * A {@link MessageBuilder} that accumulates metadata onto an {@link
AsyncMessageBuilder} and
+ * fires the send asynchronously, returning {@code null} (the CLI ignores
the send result). The
+ * pending send future is recorded so the producer can await it on close.
+ */
+ private static final class AsyncForwardingMessageBuilder implements
MessageBuilder<byte[]> {
+ private final AsyncMessageBuilder<byte[]> delegate;
+ private final List<CompletableFuture<MessageId>> pending;
+
+ AsyncForwardingMessageBuilder(AsyncMessageBuilder<byte[]> delegate,
+ List<CompletableFuture<MessageId>>
pending) {
+ this.delegate = delegate;
+ this.pending = pending;
+ }
+
+ @Override
+ public MessageId send() {
+ pending.add(delegate.send());
+ return null;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> value(byte[] value) {
+ delegate.value(value);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> key(String key) {
+ delegate.key(key);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> transaction(Transaction txn) {
+ delegate.transaction(txn);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> property(String name, String value) {
+ delegate.property(name, value);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> properties(Map<String, String>
properties) {
+ delegate.properties(properties);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> eventTime(Instant eventTime) {
+ delegate.eventTime(eventTime);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> sequenceId(long sequenceId) {
+ delegate.sequenceId(sequenceId);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> deliverAfter(Duration delay) {
+ delegate.deliverAfter(delay);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> deliverAt(Instant timestamp) {
+ delegate.deliverAt(timestamp);
+ return this;
+ }
+
+ @Override
+ public MessageBuilder<byte[]> replicationClusters(List<String>
clusters) {
+ delegate.replicationClusters(clusters);
+ return this;
+ }
}
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
index 27939e783ad..9db7f9596f9 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolTest.java
@@ -71,7 +71,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
public void testInitialization() throws InterruptedException,
ExecutionException, PulsarAdminException {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
properties.setProperty("memoryLimit", "10M");
@@ -120,7 +120,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
public void testNonDurableSubscribe() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("non-durable");
@@ -160,17 +160,17 @@ public class PulsarClientToolTest extends BrokerTestBase {
Assert.assertFalse(future.isCompletedExceptionally());
future.get();
- Awaitility.await()
- .ignoreExceptions()
- .atMost(Duration.ofMillis(20000))
- .until(()->admin.topics().getSubscriptions(topicName).size()
== 0);
+ // The V5-based pulsar-client has no non-durable subscription mode:
--subscription-mode
+ // NonDurable falls back to a durable subscription (with a warning).
So unlike the v4
+ // client, the subscription is NOT removed when the consumer
disconnects — it persists.
+ assertEquals(admin.topics().getSubscriptions(topicName).size(), 1);
}
@Test(timeOut = 60000)
public void testDurableSubscribe() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("durable");
@@ -212,7 +212,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testRead() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("reader");
@@ -262,7 +262,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
@Test(timeOut = 20000)
public void testEncryption() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("encryption");
@@ -303,10 +303,13 @@ public class PulsarClientToolTest extends BrokerTestBase {
}
}
- @Test(timeOut = 20000)
+ // Longer timeout than the other cases: this test forces an immediate
burst of async sends
+ // through the V5 producer right after create(), which can race the
scalable-topic segment
+ // layout becoming active and retry with exponential backoff before the
first batch lands.
+ @Test(timeOut = 60000)
public void testDisableBatching() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("disable-batching");
@@ -414,7 +417,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
@Test
public void testSendMultipleMessage() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = getTopicWithRandomSuffix("test-multiple-msg");
@@ -475,7 +478,9 @@ public class PulsarClientToolTest extends BrokerTestBase {
}
- @Test
+ // KeyValue schema production is not yet supported by the V5-based
pulsar-client (CmdProduce
+ // rejects --key-value-encoding-type with a clear message); deferred to a
follow-up.
+ @Test(enabled = false)
public void testProduceKeyValueSchemaInlineValue() throws Exception {
Properties properties = initializeToolProperties();
@@ -525,7 +530,9 @@ public class PulsarClientToolTest extends BrokerTestBase {
};
}
- @Test(dataProvider = "keyValueKeySchema")
+ // KeyValue schema production is not yet supported by the V5-based
pulsar-client (CmdProduce
+ // rejects --key-value-encoding-type with a clear message); deferred to a
follow-up.
+ @Test(dataProvider = "keyValueKeySchema", enabled = false)
public void testProduceKeyValueSchemaFileValue(String schema) throws
Exception {
Properties properties = initializeToolProperties();
@@ -582,7 +589,7 @@ public class PulsarClientToolTest extends BrokerTestBase {
private Properties initializeToolProperties() {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
return properties;
}
diff --git
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
index 60a37fe4c3b..44035265867 100644
---
a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
+++
b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/client/cli/PulsarClientToolWsTest.java
@@ -50,7 +50,7 @@ public class PulsarClientToolWsTest extends BrokerTestBase {
@Test(timeOut = 30000)
public void testWebSocketNonDurableSubscriptionMode() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = "persistent://my-property/my-ns/topic-" +
UUID.randomUUID();
@@ -88,16 +88,16 @@ public class PulsarClientToolWsTest extends BrokerTestBase {
Assert.assertFalse(future.isCompletedExceptionally());
}
- Awaitility.await()
- .ignoreExceptions().untilAsserted(() -> {
-
Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(), 0);
- });
+ // The V5-based pulsar-client has no non-durable subscription mode:
--subscription-mode
+ // NonDurable falls back to a durable subscription, so it persists
after the consumer
+ // disconnects rather than being removed.
+ Assert.assertEquals(admin.topics().getSubscriptions(topicName).size(),
1);
}
@Test(timeOut = 30000)
public void testWebSocketDurableSubscriptionMode() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = "persistent://my-property/my-ns/topic-" +
UUID.randomUUID();
@@ -145,7 +145,7 @@ public class PulsarClientToolWsTest extends BrokerTestBase {
@Test(timeOut = 30000)
public void testWebSocketReader() throws Exception {
Properties properties = new Properties();
- properties.setProperty("serviceUrl", brokerUrl.toString());
+ properties.setProperty("serviceUrl", pulsar.getBrokerServiceUrl());
properties.setProperty("useTls", "false");
final String topicName = "persistent://my-property/my-ns/topic-" +
UUID.randomUUID();
diff --git a/pulsar-client-tools/build.gradle.kts
b/pulsar-client-tools/build.gradle.kts
index 0da8750658c..f2555cb9d14 100644
--- a/pulsar-client-tools/build.gradle.kts
+++ b/pulsar-client-tools/build.gradle.kts
@@ -27,6 +27,8 @@ dependencies {
implementation(project(":pulsar-client-admin-api"))
implementation(project(":pulsar-client-admin-original"))
implementation(project(":pulsar-client-original"))
+ implementation(project(":pulsar-client-api-v5"))
+ implementation(project(":pulsar-client-v5"))
implementation(project(":pulsar-common"))
implementation(project(":pulsar-client-messagecrypto-bc"))
implementation(project(":pulsar-cli-utils"))
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
index 10b68648ebb..e3e4b08b36e 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmd.java
@@ -18,6 +18,8 @@
*/
package org.apache.pulsar.client.cli;
+import java.net.URI;
+import java.nio.file.Path;
import java.util.concurrent.Callable;
public abstract class AbstractCmd implements Callable<Integer> {
@@ -28,4 +30,28 @@ public abstract class AbstractCmd implements
Callable<Integer> {
}
abstract int run() throws Exception;
+
+ /**
+ * Resolve a {@code file:} URI (as accepted by the encryption-key flags)
to a {@link Path}.
+ * Supports both the hierarchical form ({@code file:///abs/path}, where
{@link URI#getPath()}
+ * is set) and the opaque relative form ({@code file:rel/path}, where the
path lives in the
+ * scheme-specific part).
+ *
+ * @param fileUri a {@code file:} URI string
+ * @return the resolved {@link Path}
+ * @throws IllegalArgumentException if the URI scheme is not {@code file}
+ */
+ static Path fileUriToPath(String fileUri) {
+ URI uri = URI.create(fileUri);
+ if (!"file".equalsIgnoreCase(uri.getScheme())) {
+ throw new IllegalArgumentException("This version of pulsar-client
supports only file:// "
+ + "encryption keys; got '" + fileUri + "'.");
+ }
+ String path = uri.getPath();
+ if (path == null) {
+ // Opaque (relative) file: URI, e.g. file:../certs/key.pem
+ path = uri.getSchemeSpecificPart();
+ }
+ return Path.of(path);
+ }
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
index 68b0abd5961..543c6f99100 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/AbstractCmdConsume.java
@@ -18,17 +18,13 @@
*/
package org.apache.pulsar.client.cli;
-import static
org.apache.pulsar.client.internal.PulsarClientImplementationBinding.getBytes;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonPrimitive;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.util.Arrays;
-import java.util.Base64;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -36,14 +32,12 @@ import java.util.concurrent.TimeUnit;
import lombok.CustomLog;
import org.apache.commons.io.HexDump;
import org.apache.pulsar.client.api.Authentication;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.schema.Field;
-import org.apache.pulsar.client.api.schema.GenericObject;
-import org.apache.pulsar.client.api.schema.GenericRecord;
-import org.apache.pulsar.common.api.EncryptionContext;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.util.DateFormatter;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.auth.EncryptionKey;
+import org.apache.pulsar.client.api.v5.auth.PrivateKeyProvider;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Session;
@@ -63,7 +57,7 @@ public abstract class AbstractCmdConsume extends AbstractCmd {
protected static final Logger LOG =
LoggerFactory.getLogger(PulsarClientTool.class);
protected static final String MESSAGE_BOUNDARY = "----- got message -----";
- protected ClientBuilder clientBuilder;
+ protected PulsarClientBuilder clientBuilder;
protected Authentication authentication;
protected String serviceURL;
@@ -75,7 +69,7 @@ public abstract class AbstractCmdConsume extends AbstractCmd {
* Set client configuration.
*
*/
- public void updateConfig(ClientBuilder clientBuilder, Authentication
authentication, String serviceURL) {
+ public void updateConfig(PulsarClientBuilder clientBuilder, Authentication
authentication, String serviceURL) {
this.clientBuilder = clientBuilder;
this.authentication = authentication;
this.serviceURL = serviceURL;
@@ -90,145 +84,64 @@ public abstract class AbstractCmdConsume extends
AbstractCmd {
* Whether to display BytesMessages in hexdump style, ignored
for simple text messages
* @return String representation of the message
*/
- protected String interpretMessage(Message<?> message, boolean displayHex,
boolean printMetadata)
+ protected String interpretMessage(Message<byte[]> message, boolean
displayHex, boolean printMetadata)
throws IOException {
StringBuilder sb = new StringBuilder();
- String properties =
Arrays.toString(message.getProperties().entrySet().toArray());
+ String properties =
Arrays.toString(message.properties().entrySet().toArray());
- String data;
- Object value = message.getValue();
- if (value == null) {
- data = "null";
- } else if (value instanceof byte[]) {
- byte[] msgData = (byte[]) value;
- data = interpretByteArray(displayHex, msgData);
- } else if (value instanceof GenericObject) {
- Map<String, Object> asMap = genericObjectToMap((GenericObject)
value, displayHex);
- data = asMap.toString();
- } else if (value instanceof ByteBuffer) {
- data = new String(getBytes((ByteBuffer) value));
- } else {
- data = value.toString();
- }
-
- sb.append("publishTime:[").append(message.getPublishTime()).append("],
");
- sb.append("eventTime:[").append(message.getEventTime()).append("], ");
+ byte[] value = message.value();
+ String data = value == null ? "null" : interpretByteArray(displayHex,
value);
- String key = null;
- if (message.hasKey()) {
- key = message.getKey();
- }
-
- sb.append("key:[").append(key).append("], ");
+ sb.append("publishTime:[").append(message.publishTime()).append("], ");
+
sb.append("eventTime:[").append(message.eventTime().orElse(null)).append("], ");
+ sb.append("key:[").append(message.key().orElse(null)).append("], ");
if (!properties.isEmpty()) {
sb.append("properties:").append(properties).append(", ");
}
sb.append("content:").append(data);
if (printMetadata) {
- if (message.getEncryptionCtx().isPresent()) {
- EncryptionContext encContext =
message.getEncryptionCtx().get();
- if (encContext.getKeys() != null &&
!encContext.getKeys().isEmpty()) {
- sb.append(", ");
- sb.append("encryption-keys:").append(", ");
- encContext.getKeys().forEach((keyName, keyInfo) -> {
- String metadata =
Arrays.toString(keyInfo.getMetadata().entrySet().toArray());
- sb.append("name:").append(keyName).append(",
").append("key-value:")
-
.append(Base64.getEncoder().encodeToString(keyInfo.getKeyValue())).append(", ")
-
.append("metadata:").append(metadata).append(", ");
-
- });
- sb.append(",
").append("param:").append(Base64.getEncoder().encodeToString(encContext.getParam()))
- .append(",
").append("algorithm:").append(encContext.getAlgorithm()).append(", ")
-
.append("compression-type:").append(encContext.getCompressionType()).append(",
")
-
.append("uncompressed-size").append(encContext.getUncompressedMessageSize()).append(",
")
- .append("batch-size")
- .append(encContext.getBatchSize().isPresent() ?
encContext.getBatchSize().get() : 1);
- }
- }
- if (message.hasBrokerPublishTime()) {
- sb.append(",
").append("publish-time:").append(DateFormatter.format(message.getPublishTime()));
- }
- sb.append(",
").append("event-time:").append(DateFormatter.format(message.getEventTime()));
- sb.append(",
").append("message-id:").append(message.getMessageId());
- sb.append(",
").append("producer-name:").append(message.getProducerName());
- sb.append(",
").append("sequence-id:").append(message.getSequenceId());
- sb.append(",
").append("replicated-from:").append(message.getReplicatedFrom());
- sb.append(",
").append("redelivery-count:").append(message.getRedeliveryCount());
- sb.append(", ").append("ordering-key:")
- .append(message.getOrderingKey() != null ? new
String(message.getOrderingKey()) : "");
- sb.append(", ").append("schema-version:")
- .append(message.getSchemaVersion() != null ? new
String(message.getSchemaVersion()) : "");
- if (message.hasIndex()) {
- sb.append(", ").append("index:").append(message.getIndex());
- }
+ sb.append(", ").append("message-id:").append(message.id());
+ sb.append(",
").append("producer-name:").append(message.producerName().orElse(null));
+ sb.append(",
").append("sequence-id:").append(message.sequenceId());
+ sb.append(",
").append("replicated-from:").append(message.replicatedFrom().orElse(null));
+ sb.append(",
").append("redelivery-count:").append(message.redeliveryCount());
}
return sb.toString();
}
protected static String interpretByteArray(boolean displayHex, byte[]
msgData) throws IOException {
- String data;
- ByteArrayOutputStream out = new ByteArrayOutputStream();
if (!displayHex) {
return new String(msgData);
} else {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
HexDump.dump(msgData, 0, out, 0);
return out.toString();
}
}
- protected static Map<String, Object> genericObjectToMap(GenericObject
value, boolean displayHex)
- throws IOException {
- switch (value.getSchemaType()) {
- case AVRO:
- case JSON:
- case PROTOBUF_NATIVE:
- return genericRecordToMap((GenericRecord) value,
displayHex);
- case KEY_VALUE:
- return keyValueToMap((KeyValue<?, ?>)
value.getNativeObject(), displayHex);
- default:
- return primitiveValueToMap(value.getNativeObject(),
displayHex);
- }
- }
-
- protected static Map<String, Object> keyValueToMap(KeyValue<?, ?> value,
boolean displayHex) throws IOException {
- if (value == null) {
- return Map.of("value", "NULL");
- }
- return Map.of("key", primitiveValueToMap(value.getKey(), displayHex),
- "value", primitiveValueToMap(value.getValue(), displayHex));
- }
-
- protected static Map<String, Object> primitiveValueToMap(Object value,
boolean displayHex) throws IOException {
- if (value == null) {
- return Map.of("value", "NULL");
- }
- if (value instanceof GenericObject) {
- return genericObjectToMap((GenericObject) value, displayHex);
- }
- if (value instanceof byte[]) {
- value = interpretByteArray(displayHex, (byte[]) value);
- }
- return Map.of("value", value.toString(), "type", value.getClass());
- }
-
- protected static Map<String, Object> genericRecordToMap(GenericRecord
value, boolean displayHex)
- throws IOException {
- Map<String, Object> res = new HashMap<>();
- for (Field f : value.getFields()) {
- Object fieldValue = value.getField(f);
- if (fieldValue instanceof GenericRecord) {
- fieldValue = genericRecordToMap((GenericRecord) fieldValue,
displayHex);
- } else if (fieldValue == null) {
- fieldValue = "NULL";
- } else if (fieldValue instanceof byte[]) {
- fieldValue = interpretByteArray(displayHex, (byte[])
fieldValue);
- }
- res.put(f.getName(), fieldValue);
- }
- return res;
+ /**
+ * Build a consumer-side decryption policy from a {@code file://} key URI,
mirroring the v4
+ * {@code defaultCryptoKeyReader(uri)} semantics: the private key is
loaded once and returned
+ * for any key name. (The producer's logical key name travels in the
message metadata, so a
+ * name-keyed provider would not resolve it; the CLI's file-based flow has
a single key.)
+ */
+ protected static ConsumerEncryptionPolicy buildFileDecryptionPolicy(
+ String keyUri, ConsumerCryptoFailureAction failureAction) {
+ final byte[] keyBytes;
+ try {
+ keyBytes = Files.readAllBytes(fileUriToPath(keyUri));
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to read decryption key
from " + keyUri, e);
+ }
+ PrivateKeyProvider provider = (keyName, metadata) ->
+ CompletableFuture.completedFuture(EncryptionKey.of(keyBytes));
+ return ConsumerEncryptionPolicy.builder()
+ .privateKeyProvider(provider)
+ .failureAction(failureAction)
+ .build();
}
@WebSocket
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 4a5254cd9c0..4c7164d7db3 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -23,21 +23,20 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.ConsumerBuilder;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
-import org.apache.pulsar.client.api.SubscriptionMode;
-import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.QueueConsumer;
+import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -56,6 +55,27 @@ import picocli.CommandLine.Spec;
@Command(description = "Consume messages from a specified topic")
public class CmdConsume extends AbstractCmdConsume {
+ /**
+ * v4-compatible subscription-type flag. This version of pulsar-client
consumes through a V5
+ * {@link QueueConsumer} for all types, since it is the only consumer that
works against both
+ * regular and scalable topics (the ordered StreamConsumer requires a
scalable-topic
+ * controller). Exclusive / Failover therefore get work-queue
(Shared-style) semantics and log
+ * a warning rather than preserving single-reader ordering.
+ */
+ public enum SubscriptionType {
+ Exclusive,
+ Shared,
+ Failover,
+ Key_Shared
+ }
+
+ /** v4-compatible subscription-mode flag. Only honored by the WebSocket
path; the V5 binary
+ * consumer is always durable, so NonDurable logs a warning. */
+ public enum SubscriptionMode {
+ Durable,
+ NonDurable
+ }
+
@Parameters(description = "TopicName", arity = "1")
private String topic;
@@ -66,7 +86,7 @@ public class CmdConsume extends AbstractCmdConsume {
private SubscriptionMode subscriptionMode = SubscriptionMode.Durable;
@Option(names = { "-p", "--subscription-position" }, description =
"Subscription position.")
- private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.Latest;
+ private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.LATEST;
@Option(names = { "-s", "--subscription-name" }, required = true,
description = "Subscription name.")
private String subscriptionName;
@@ -119,9 +139,6 @@ public class CmdConsume extends AbstractCmdConsume {
@Option(names = { "-mp", "--print-metadata" }, description = "Message
metadata")
private boolean printMetadata = false;
- @Option(names = { "-stp", "--start-timestamp" }, description = "Start
timestamp for consuming messages")
- private long startTimestamp = 0L;
-
@Option(names = { "-etp", "--end-timestamp" }, description = "End
timestamp for consuming messages")
private long endTimestamp = Long.MAX_VALUE;
@@ -146,18 +163,10 @@ public class CmdConsume extends AbstractCmdConsume {
throw new CommandLine.ParameterException(commandSpec.commandLine(),
"Number of messages should be zero or positive.");
}
- if (this.startTimestamp < 0) {
- throw new CommandLine.ParameterException(commandSpec.commandLine(),
- "start timestamp should be positive.");
- }
if (this.endTimestamp < 0) {
throw new CommandLine.ParameterException(commandSpec.commandLine(),
"end timestamp should be positive.");
}
- if (this.endTimestamp < startTimestamp) {
- throw new CommandLine.ParameterException(commandSpec.commandLine(),
- "end timestamp should larger than start timestamp.");
- }
if (this.serviceURL.startsWith("ws")) {
return consumeFromWebSocket(topic);
@@ -170,71 +179,67 @@ public class CmdConsume extends AbstractCmdConsume {
int numMessagesConsumed = 0;
int returnCode = 0;
+ if ("auto_consume".equals(schemaType)) {
+ throw new IllegalArgumentException("schema type 'auto_consume' is
not supported by this "
+ + "version of pulsar-client; consume with 'bytes' (the
default).");
+ } else if (!"bytes".equals(schemaType)) {
+ throw new IllegalArgumentException("schema type must be 'bytes'");
+ }
+ if (!poolMessages) {
+ LOG.info("--pool-messages has no effect on this version of
pulsar-client.");
+ }
+ if (subscriptionMode == SubscriptionMode.NonDurable) {
+ LOG.warn("--subscription-mode NonDurable is not supported by this
version of pulsar-client; "
+ + "a durable subscription is used instead.");
+ }
+ if (subscriptionType == SubscriptionType.Exclusive || subscriptionType
== SubscriptionType.Failover) {
+ // The V5 StreamConsumer (ordered, single-reader) requires a
scalable-topic subscription
+ // controller, which regular topics do not have; only the
QueueConsumer works against
+ // both regular and scalable topics. So all subscription types use
a QueueConsumer here
+ // and Exclusive/Failover get work-queue (Shared-style) semantics
rather than ordered.
+ LOG.warn("--subscription-type {} : this version of pulsar-client
consumes via a work-queue "
+ + "(Shared-style) subscription; exclusive/failover
ordering is not preserved.",
+ subscriptionType);
+ }
+ if (maxPendingChunkedMessage > 0 ||
autoAckOldestChunkedMessageOnQueueFull) {
+ LOG.warn("Chunked-message knobs (--max_chunked_msg /
--auto_ack_chunk_q_full) have no effect "
+ + "on this version of pulsar-client.");
+ }
+
try (PulsarClient client = clientBuilder.build()) {
- ConsumerBuilder<?> builder;
- Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
- if ("auto_consume".equals(schemaType)) {
- schema = Schema.AUTO_CONSUME();
- } else if (!"bytes".equals(schemaType)) {
- throw new IllegalArgumentException("schema type must be
'bytes' or 'auto_consume'");
- }
- builder = client.newConsumer(schema)
+ RateLimiter limiter = (this.consumeRate > 0) ?
RateLimiter.create(this.consumeRate) : null;
+ QueueConsumerBuilder<byte[]> builder =
client.newQueueConsumer(Schema.bytes())
.subscriptionName(this.subscriptionName)
- .subscriptionType(subscriptionType)
- .subscriptionMode(subscriptionMode)
.subscriptionInitialPosition(subscriptionInitialPosition)
- .poolMessages(poolMessages)
.replicateSubscriptionState(replicateSubscriptionState);
-
- if (isRegex) {
- builder.topicsPattern(Pattern.compile(topic));
- } else {
- builder.topic(topic);
- }
-
- if (this.maxPendingChunkedMessage > 0) {
-
builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
- }
if (this.receiverQueueSize > 0) {
builder.receiverQueueSize(this.receiverQueueSize);
}
-
-
builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
- builder.cryptoFailureAction(cryptoFailureAction);
-
if (isNotBlank(this.encKeyValue)) {
- builder.defaultCryptoKeyReader(this.encKeyValue);
+ builder.encryptionPolicy(buildConsumerEncryptionPolicy());
}
+ applyTopicSelection(builder::topic, builder::namespace);
- try (Consumer<?> consumer = builder.subscribe();) {
- if (startTimestamp > 0L) {
- consumer.seek(startTimestamp);
- }
- RateLimiter limiter = (this.consumeRate > 0) ?
RateLimiter.create(this.consumeRate) : null;
+ try (QueueConsumer<byte[]> consumer = builder.subscribe()) {
while (this.numMessagesToConsume == 0 || numMessagesConsumed <
this.numMessagesToConsume) {
if (limiter != null) {
limiter.acquire();
}
- Message<?> msg = consumer.receive(5, TimeUnit.SECONDS);
+ Message<byte[]> msg =
consumer.receive(Duration.ofSeconds(5));
if (msg == null) {
LOG.debug("No message to consume after waiting for 5
seconds.");
} else {
- try {
- if (msg.getPublishTime() > endTimestamp) {
- break;
- }
- numMessagesConsumed += 1;
- if (!hideContent) {
- System.out.println(MESSAGE_BOUNDARY);
- String output = this.interpretMessage(msg,
displayHex, printMetadata);
- System.out.println(output);
- } else if (numMessagesConsumed % 1000 == 0) {
- System.out.println("Received " +
numMessagesConsumed + " messages");
- }
- consumer.acknowledge(msg);
- } finally {
- msg.release();
+ if (msg.publishTime().toEpochMilli() > endTimestamp) {
+ break;
+ }
+ numMessagesConsumed += 1;
+ if (!hideContent) {
+ System.out.println(MESSAGE_BOUNDARY);
+ System.out.println(this.interpretMessage(msg,
displayHex, printMetadata));
+ } else if (numMessagesConsumed % 1000 == 0) {
+ System.out.println("Received " +
numMessagesConsumed + " messages");
}
+ consumer.acknowledge(msg.id());
}
}
}
@@ -247,7 +252,41 @@ public class CmdConsume extends AbstractCmdConsume {
}
return returnCode;
+ }
+
+ /**
+ * Apply the topic argument to the consumer. A plain topic uses {@code
topic(...)}; a
+ * {@code --regex} pattern is mapped to a namespace subscription over the
pattern's
+ * {@code tenant/namespace} (V5 has no topic-regex; namespace
subscriptions follow the
+ * namespace live).
+ */
+ private void applyTopicSelection(java.util.function.Consumer<String>
topicFn,
+ java.util.function.Consumer<String>
namespaceFn) {
+ if (isRegex) {
+ namespaceFn.accept(namespaceFromPattern(topic));
+ } else {
+ topicFn.accept(topic);
+ }
+ }
+
+ static String namespaceFromPattern(String pattern) {
+ // Strip an optional persistent:// / non-persistent:// domain prefix,
then take the first
+ // two path segments as tenant/namespace.
+ String rest = pattern;
+ int scheme = rest.indexOf("://");
+ if (scheme >= 0) {
+ rest = rest.substring(scheme + 3);
+ }
+ String[] parts = rest.split("/");
+ if (parts.length < 2) {
+ throw new IllegalArgumentException("Cannot derive a
tenant/namespace from --regex pattern '"
+ + pattern + "'. Use a fully-qualified pattern, e.g.
persistent://tenant/namespace/.*");
+ }
+ return parts[0] + "/" + parts[1];
+ }
+ private ConsumerEncryptionPolicy buildConsumerEncryptionPolicy() {
+ return buildFileDecryptionPolicy(this.encKeyValue,
cryptoFailureAction);
}
@VisibleForTesting
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
index ae910f1913d..8de079f7bfa 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdProduce.java
@@ -35,7 +35,6 @@ import java.util.Base64;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -49,19 +48,19 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ClientBuilder;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerBuilder;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.TypedMessageBuilder;
-import org.apache.pulsar.client.api.schema.KeyValueSchema;
-import org.apache.pulsar.client.impl.schema.SchemaInfoImpl;
+import org.apache.pulsar.client.api.v5.MessageBuilder;
+import org.apache.pulsar.client.api.v5.Producer;
+import org.apache.pulsar.client.api.v5.ProducerBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.config.BatchingPolicy;
+import org.apache.pulsar.client.api.v5.config.ChunkingPolicy;
+import org.apache.pulsar.client.api.v5.config.ProducerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.api.v5.schema.SchemaInfo;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
import org.apache.pulsar.common.naming.TopicName;
-import org.apache.pulsar.common.schema.KeyValue;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.websocket.data.ProducerMessage;
import org.eclipse.jetty.client.HttpClient;
@@ -89,8 +88,6 @@ import picocli.CommandLine.Spec;
public class CmdProduce extends AbstractCmd {
private static final int MAX_MESSAGES = 1000;
static final String KEY_VALUE_ENCODING_TYPE_NOT_SET = "";
- private static final String KEY_VALUE_ENCODING_TYPE_SEPARATED =
"separated";
- private static final String KEY_VALUE_ENCODING_TYPE_INLINE = "inline";
@Parameters(description = "TopicName", arity = "1")
private String topic;
@@ -160,7 +157,7 @@ public class CmdProduce extends AbstractCmd {
"--disable-replication" }, description = "Disable geo-replication
for messages.")
private boolean disableReplication = false;
- private ClientBuilder clientBuilder;
+ private PulsarClientBuilder clientBuilder;
private Authentication authentication;
private String serviceURL;
@@ -171,7 +168,7 @@ public class CmdProduce extends AbstractCmd {
/**
* Set Pulsar client configuration.
*/
- public void updateConfig(ClientBuilder newBuilder, Authentication
authentication, String serviceURL) {
+ public void updateConfig(PulsarClientBuilder newBuilder, Authentication
authentication, String serviceURL) {
this.clientBuilder = newBuilder;
this.authentication = authentication;
this.serviceURL = serviceURL;
@@ -186,20 +183,15 @@ public class CmdProduce extends AbstractCmd {
*
* @return list of message bodies
*/
- @SuppressWarnings({"unchecked", "rawtypes"})
static List<byte[]> generateMessageBodies(List<String> stringMessages,
List<String> messageFileNames,
- Schema schema) {
+ org.apache.avro.Schema
avroSchema) {
List<byte[]> messageBodies = new ArrayList<>();
for (String m : stringMessages) {
- if (schema.getSchemaInfo().getType() == SchemaType.AVRO) {
- // JSON TO AVRO
- @SuppressWarnings("unchecked")
- Optional<org.apache.avro.Schema> nativeSchema =
- (Optional<org.apache.avro.Schema>) (Optional<?>)
schema.getNativeSchema();
- org.apache.avro.Schema avroSchema = nativeSchema.get();
- byte[] encoded = jsonToAvro(m, avroSchema);
- messageBodies.add(encoded);
+ if (avroSchema != null) {
+ // JSON TO AVRO — the V5 Schema does not expose the native
Avro schema, so the
+ // caller passes the parsed Avro definition directly.
+ messageBodies.add(jsonToAvro(m, avroSchema));
} else {
messageBodies.add(m.getBytes());
}
@@ -267,15 +259,11 @@ public class CmdProduce extends AbstractCmd {
if (keyValueEncodingType == null) {
keyValueEncodingType = KEY_VALUE_ENCODING_TYPE_NOT_SET;
- } else {
- switch (keyValueEncodingType) {
- case KEY_VALUE_ENCODING_TYPE_SEPARATED:
- case KEY_VALUE_ENCODING_TYPE_INLINE:
- break;
- default:
- throw (new
IllegalArgumentException("--key-value-encoding-type "
- + keyValueEncodingType + " is not valid, only
'separated' or 'inline'"));
- }
+ } else if
(!KEY_VALUE_ENCODING_TYPE_NOT_SET.equals(keyValueEncodingType)) {
+ // KeyValue schemas are not yet supported by the V5-based
pulsar-client.
+ throw new IllegalArgumentException("KeyValue schemas
(--key-value-encoding-type) are not "
+ + "supported by this version of pulsar-client; produce
with a plain value schema "
+ + "(-vs bytes|string|avro:<def>|json:<def>) instead.");
}
int totalMessages = (messages.size() + messageFileNames.size()) *
numTimesProduce;
@@ -296,24 +284,25 @@ public class CmdProduce extends AbstractCmd {
int numMessagesSent = 0;
int returnCode = 0;
- try (PulsarClient client = clientBuilder.build()){
- Schema<?> schema = buildSchema(this.keySchema, this.valueSchema,
this.keyValueEncodingType);
- ProducerBuilder<?> producerBuilder =
client.newProducer(schema).topic(topic);
+ if (this.disableReplication) {
+ log.warn("--disable-replication has no effect on this version of
pulsar-client and is ignored.");
+ }
+
+ try (PulsarClient client = clientBuilder.build()) {
+ ValueSchema vs = buildValueSchema(this.valueSchema);
+ ProducerBuilder<byte[]> producerBuilder =
client.newProducer(vs.schema).topic(topic);
if (this.chunkingAllowed) {
- producerBuilder.enableChunking(true);
- producerBuilder.enableBatching(false);
+
producerBuilder.chunkingPolicy(ChunkingPolicy.builder().enabled(true).build());
+ producerBuilder.batchingPolicy(BatchingPolicy.ofDisabled());
} else if (this.disableBatching) {
- producerBuilder.enableBatching(false);
+ producerBuilder.batchingPolicy(BatchingPolicy.ofDisabled());
}
if (isNotBlank(this.encKeyName) && isNotBlank(this.encKeyValue)) {
- producerBuilder.addEncryptionKey(this.encKeyName);
- producerBuilder.defaultCryptoKeyReader(this.encKeyValue);
+
producerBuilder.encryptionPolicy(buildEncryptionPolicy(this.encKeyName,
this.encKeyValue));
}
- try (Producer<?> producer = producerBuilder.create();) {
- Schema<?> schemaForPayload = schema.getSchemaInfo().getType()
== SchemaType.KEY_VALUE
- ? ((KeyValueSchema) schema).getValueSchema() : schema;
+ try (Producer<byte[]> producer = producerBuilder.create()) {
List<byte[]> messageBodies =
generateMessageBodies(this.messages, this.messageFileNames,
- schemaForPayload);
+ vs.avroNative);
RateLimiter limiter = (this.publishRate > 0) ?
RateLimiter.create(this.publishRate) : null;
Map<String, String> kvMap = new HashMap<>();
@@ -322,63 +311,21 @@ public class CmdProduce extends AbstractCmd {
kvMap.put(kv[0], kv[1]);
}
- final byte[] keyValueKeyBytes;
- if (this.keyValueKey != null) {
- if (keyValueEncodingType ==
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
- throw new IllegalArgumentException(
- "Key value encoding type must be set when using
--key-value-key");
- }
- keyValueKeyBytes =
this.keyValueKey.getBytes(StandardCharsets.UTF_8);
- } else if (this.keyValueKeyFile != null) {
- if (keyValueEncodingType ==
KEY_VALUE_ENCODING_TYPE_NOT_SET) {
- throw new IllegalArgumentException(
- "Key value encoding type must be set when using
--key-value-key-file");
- }
- keyValueKeyBytes =
Files.readAllBytes(Paths.get(this.keyValueKeyFile));
- } else if (this.key != null) {
- keyValueKeyBytes =
this.key.getBytes(StandardCharsets.UTF_8);
- } else {
- keyValueKeyBytes = null;
- }
-
for (int i = 0; i < this.numTimesProduce; i++) {
for (byte[] content : messageBodies) {
if (limiter != null) {
limiter.acquire();
}
- @SuppressWarnings("unchecked")
- TypedMessageBuilder<Object> message =
(TypedMessageBuilder<Object>) producer.newMessage();
-
+ MessageBuilder<byte[]> message = producer.newMessage();
if (!kvMap.isEmpty()) {
message.properties(kvMap);
}
-
- switch (keyValueEncodingType) {
- case KEY_VALUE_ENCODING_TYPE_NOT_SET:
- if (key != null && !key.isEmpty()) {
- message.key(key);
- }
- message.value(content);
- break;
- case KEY_VALUE_ENCODING_TYPE_SEPARATED:
- case KEY_VALUE_ENCODING_TYPE_INLINE:
- KeyValue<byte[], byte[]> kv = new KeyValue<>(
- keyValueKeyBytes,
- content);
- message.value(kv);
- break;
- default:
- throw new IllegalStateException();
+ if (key != null && !key.isEmpty()) {
+ message.key(key);
}
-
- if (disableReplication) {
- message.disableReplication();
- }
-
+ message.value(content);
message.send();
-
-
numMessagesSent++;
}
}
@@ -393,52 +340,43 @@ public class CmdProduce extends AbstractCmd {
return returnCode;
}
- static Schema<?> buildSchema(String keySchema, String schema, String
keyValueEncodingType) {
- switch (keyValueEncodingType) {
- case KEY_VALUE_ENCODING_TYPE_NOT_SET:
- return buildComponentSchema(schema);
- case KEY_VALUE_ENCODING_TYPE_SEPARATED:
- return Schema.KeyValue(buildComponentSchema(keySchema),
buildComponentSchema(schema),
- KeyValueEncodingType.SEPARATED);
- case KEY_VALUE_ENCODING_TYPE_INLINE:
- return Schema.KeyValue(buildComponentSchema(keySchema),
buildComponentSchema(schema),
- KeyValueEncodingType.INLINE);
- default:
- throw new IllegalArgumentException("Invalid
KeyValueEncodingType "
- + keyValueEncodingType + ", only: 'none','separated'
and 'inline");
- }
+ /** A V5 producer schema (always {@code byte[]}) plus, for {@code avro:},
the parsed Avro
+ * definition used to convert JSON input into Avro bytes. */
+ record ValueSchema(Schema<byte[]> schema, org.apache.avro.Schema
avroNative) {
}
- private static Schema<?> buildComponentSchema(String schema) {
- Schema<?> base;
- switch (schema) {
- case "string":
- base = Schema.STRING;
- break;
+ static ValueSchema buildValueSchema(String valueSchema) {
+ switch (valueSchema) {
case "bytes":
- // no need for wrappers
- return Schema.BYTES;
+ return new ValueSchema(Schema.bytes(), null);
+ case "string":
+ return new
ValueSchema(Schema.autoProduceBytesOf(Schema.string()), null);
default:
- if (schema.startsWith("avro:")) {
- base = buildGenericSchema(SchemaType.AVRO,
schema.substring(5));
- } else if (schema.startsWith("json:")) {
- base = buildGenericSchema(SchemaType.JSON,
schema.substring(5));
- } else {
- throw new IllegalArgumentException("Invalid schema type: "
+ schema);
+ if (valueSchema.startsWith("avro:")) {
+ String def = valueSchema.substring(5);
+ org.apache.avro.Schema avroNative = new
org.apache.avro.Schema.Parser().parse(def);
+ Schema<?> generic = Schema.generic(
+ SchemaInfo.of("client", SchemaType.AVRO,
+ def.getBytes(StandardCharsets.UTF_8),
null));
+ return new ValueSchema(Schema.autoProduceBytesOf(generic),
avroNative);
+ } else if (valueSchema.startsWith("json:")) {
+ String def = valueSchema.substring(5);
+ Schema<?> generic = Schema.generic(
+ SchemaInfo.of("client", SchemaType.JSON,
+ def.getBytes(StandardCharsets.UTF_8),
null));
+ return new ValueSchema(Schema.autoProduceBytesOf(generic),
null);
}
+ throw new IllegalArgumentException("Invalid schema type: " +
valueSchema);
}
- return Schema.AUTO_PRODUCE_BYTES(base);
}
- private static Schema<?> buildGenericSchema(SchemaType type, String
definition) {
- return Schema.generic(SchemaInfoImpl
- .builder()
- .schema(definition.getBytes(StandardCharsets.UTF_8))
- .name("client")
- .properties(new HashMap<>())
- .type(type)
- .build());
-
+ private static ProducerEncryptionPolicy buildEncryptionPolicy(String
keyName, String keyUri) {
+ return ProducerEncryptionPolicy.builder()
+
.publicKeyProvider(org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider.builder()
+ .publicKey(keyName, fileUriToPath(keyUri))
+ .build())
+ .keyName(keyName)
+ .build();
}
@VisibleForTesting
@@ -501,7 +439,7 @@ public class CmdProduce extends AbstractCmd {
}
try {
- List<byte[]> messageBodies = generateMessageBodies(this.messages,
this.messageFileNames, Schema.BYTES);
+ List<byte[]> messageBodies = generateMessageBodies(this.messages,
this.messageFileNames, null);
RateLimiter limiter = (this.publishRate > 0) ?
RateLimiter.create(this.publishRate) : null;
for (int i = 0; i < this.numTimesProduce; i++) {
int index = i * 10;
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
index e5421ffa6ac..63ecb4e4e76 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdRead.java
@@ -23,22 +23,21 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;
import java.io.IOException;
import java.net.URI;
+import java.time.Duration;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
-import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.Reader;
-import org.apache.pulsar.client.api.ReaderBuilder;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.impl.MessageIdImpl;
+import org.apache.pulsar.client.api.v5.Checkpoint;
+import org.apache.pulsar.client.api.v5.CheckpointConsumer;
+import org.apache.pulsar.client.api.v5.CheckpointConsumerBuilder;
+import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientException;
+import org.apache.pulsar.client.api.v5.auth.ConsumerCryptoFailureAction;
+import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
+import org.apache.pulsar.client.api.v5.schema.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@@ -54,13 +53,11 @@ import picocli.CommandLine.Parameters;
@Command(description = "Read messages from a specified topic")
public class CmdRead extends AbstractCmdConsume {
- private static final Pattern MSG_ID_PATTERN =
Pattern.compile("^(-?[1-9][0-9]*|0):(-?[1-9][0-9]*|0)$");
-
@Parameters(description = "TopicName", arity = "1")
private String topic;
@Option(names = { "-m", "--start-message-id" },
- description = "Initial reader position, it can be 'latest',
'earliest' or '<ledgerId>:<entryId>'")
+ description = "Initial reader position, it can be 'latest' or
'earliest'")
private String startMessageId = "latest";
@Option(names = { "-i", "--start-message-id-inclusive" },
@@ -106,6 +103,9 @@ public class CmdRead extends AbstractCmdConsume {
@Option(names = { "-ca", "--crypto-failure-action" }, description =
"Crypto Failure Action")
private ConsumerCryptoFailureAction cryptoFailureAction =
ConsumerCryptoFailureAction.FAIL;
+ private static final String START_EARLIEST = "earliest";
+ private static final String START_LATEST = "latest";
+
@Option(names = { "-mp", "--print-metadata" }, description = "Message
metadata")
private boolean printMetadata = false;
@@ -123,7 +123,10 @@ public class CmdRead extends AbstractCmdConsume {
if (this.numMessagesToRead < 0) {
throw (new IllegalArgumentException("Number of messages should be
zero or positive."));
}
-
+ if (!START_LATEST.equals(startMessageId) &&
!START_EARLIEST.equals(startMessageId)) {
+ throw new IllegalArgumentException("--start-message-id must be
'latest' or 'earliest'; the "
+ + "'<ledgerId>:<entryId>' form is not supported by this
version of pulsar-client.");
+ }
if (this.serviceURL.startsWith("ws")) {
return readFromWebSocket(topic);
@@ -136,59 +139,51 @@ public class CmdRead extends AbstractCmdConsume {
int numMessagesRead = 0;
int returnCode = 0;
- try (PulsarClient client = clientBuilder.build()){
- ReaderBuilder<?> builder;
-
- Schema<?> schema = poolMessages ? Schema.BYTEBUFFER : Schema.BYTES;
- if ("auto_consume".equals(schemaType)) {
- schema = Schema.AUTO_CONSUME();
- } else if (!"bytes".equals(schemaType)) {
- throw new IllegalArgumentException("schema type must be
'bytes' or 'auto_consume'");
- }
- builder = client.newReader(schema)
- .topic(topic)
- .startMessageId(parseMessageId(startMessageId))
- .poolMessages(poolMessages);
-
- if (this.startMessageIdInclusive) {
- builder.startMessageIdInclusive();
- }
- if (this.maxPendingChunkedMessage > 0) {
-
builder.maxPendingChunkedMessage(this.maxPendingChunkedMessage);
- }
- if (this.receiverQueueSize > 0) {
- builder.receiverQueueSize(this.receiverQueueSize);
- }
+ if ("auto_consume".equals(schemaType)) {
+ throw new IllegalArgumentException("schema type 'auto_consume' is
not supported by this "
+ + "version of pulsar-client; read with 'bytes' (the
default).");
+ } else if (!"bytes".equals(schemaType)) {
+ throw new IllegalArgumentException("schema type must be 'bytes'");
+ }
+ if (!poolMessages) {
+ LOG.info("--pool-messages has no effect on this version of
pulsar-client.");
+ }
+ if (this.startMessageIdInclusive) {
+ LOG.warn("--start-message-id-inclusive has no effect on this
version of pulsar-client.");
+ }
+ if (maxPendingChunkedMessage > 0 ||
autoAckOldestChunkedMessageOnQueueFull) {
+ LOG.warn("Chunked-message knobs (--max_chunked_msg /
--auto_ack_chunk_q_full) have no effect "
+ + "on this version of pulsar-client.");
+ }
-
builder.autoAckOldestChunkedMessageOnQueueFull(this.autoAckOldestChunkedMessageOnQueueFull);
- builder.cryptoFailureAction(cryptoFailureAction);
+ Checkpoint startPosition = START_EARLIEST.equals(startMessageId)
+ ? Checkpoint.earliest() : Checkpoint.latest();
+ try (PulsarClient client = clientBuilder.build()) {
+ CheckpointConsumerBuilder<byte[]> builder =
client.newCheckpointConsumer(Schema.bytes())
+ .topic(topic)
+ .startPosition(startPosition);
if (isNotBlank(this.encKeyValue)) {
- builder.defaultCryptoKeyReader(this.encKeyValue);
+ builder.encryptionPolicy(buildConsumerEncryptionPolicy());
}
- try (Reader<?> reader = builder.create()) {
+ try (CheckpointConsumer<byte[]> reader = builder.create()) {
RateLimiter limiter = (this.readRate > 0) ?
RateLimiter.create(this.readRate) : null;
while (this.numMessagesToRead == 0 || numMessagesRead <
this.numMessagesToRead) {
if (limiter != null) {
limiter.acquire();
}
- Message<?> msg = reader.readNext(5, TimeUnit.SECONDS);
+ Message<byte[]> msg =
reader.receive(Duration.ofSeconds(5));
if (msg == null) {
LOG.debug("No message to read after waiting for 5
seconds.");
} else {
- try {
- numMessagesRead += 1;
- if (!hideContent) {
- System.out.println(MESSAGE_BOUNDARY);
- String output = this.interpretMessage(msg,
displayHex, printMetadata);
- System.out.println(output);
- } else if (numMessagesRead % 1000 == 0) {
- System.out.println("Received " +
numMessagesRead + " messages");
- }
- } finally {
- msg.release();
+ numMessagesRead += 1;
+ if (!hideContent) {
+ System.out.println(MESSAGE_BOUNDARY);
+ System.out.println(this.interpretMessage(msg,
displayHex, printMetadata));
+ } else if (numMessagesRead % 1000 == 0) {
+ System.out.println("Received " + numMessagesRead +
" messages");
}
}
}
@@ -202,7 +197,10 @@ public class CmdRead extends AbstractCmdConsume {
}
return returnCode;
+ }
+ private ConsumerEncryptionPolicy buildConsumerEncryptionPolicy() {
+ return buildFileDecryptionPolicy(this.encKeyValue,
cryptoFailureAction);
}
@VisibleForTesting
@@ -214,16 +212,9 @@ public class CmdRead extends AbstractCmdConsume {
String wsTopic = String.format("%s/%s/%s/%s", topicName.getDomain(),
topicName.getTenant(),
topicName.getNamespacePortion(), topicName.getLocalName());
- String msgIdQueryParam;
- if ("latest".equals(startMessageId) ||
"earliest".equals(startMessageId)) {
- msgIdQueryParam = startMessageId;
- } else {
- MessageId msgId = parseMessageId(startMessageId);
- msgIdQueryParam =
Base64.getEncoder().encodeToString(msgId.toByteArray());
- }
-
+ // Only 'latest' / 'earliest' are accepted (validated in run()).
return String.format("%s/ws/v2/reader/%s?messageId=%s",
serviceURLWithoutTrailingSlash, wsTopic,
- msgIdQueryParam);
+ startMessageId);
}
@SuppressWarnings("deprecation")
@@ -311,22 +302,4 @@ public class CmdRead extends AbstractCmdConsume {
return returnCode;
}
- @VisibleForTesting
- static MessageId parseMessageId(String msgIdStr) {
- MessageId msgId;
- if ("latest".equals(msgIdStr)) {
- msgId = MessageId.latest;
- } else if ("earliest".equals(msgIdStr)) {
- msgId = MessageId.earliest;
- } else {
- Matcher matcher = MSG_ID_PATTERN.matcher(msgIdStr);
- if (matcher.find()) {
- msgId = new MessageIdImpl(Long.parseLong(matcher.group(1)),
Long.parseLong(matcher.group(2)), -1);
- } else {
- throw new IllegalArgumentException("Message ID must be
'latest', 'earliest' or '<ledgerId>:<entryId>'");
- }
- }
- return msgId;
- }
-
}
diff --git
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
index 4203d75ba4a..481f8266ed0 100644
---
a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
+++
b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/PulsarClientTool.java
@@ -22,19 +22,19 @@ import static
org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.annotations.VisibleForTesting;
import java.io.FileInputStream;
import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.pulsar.cli.converters.picocli.ByteUnitToLongConverter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
-import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
-import org.apache.pulsar.client.api.PulsarClient;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
-import org.apache.pulsar.client.api.SizeUnit;
+import org.apache.pulsar.client.api.v5.PulsarClient;
+import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
+import org.apache.pulsar.client.api.v5.config.ConnectionPolicy;
+import org.apache.pulsar.client.api.v5.config.MemorySize;
+import org.apache.pulsar.client.api.v5.config.TlsPolicy;
import org.apache.pulsar.internal.CommandHook;
import org.apache.pulsar.internal.CommanderFactory;
import picocli.CommandLine;
@@ -130,37 +130,72 @@ public class PulsarClientTool implements CommandHook {
commander.addSubcommand("consume", consumeCommand);
commander.addSubcommand("read", readCommand);
commander.addSubcommand("generate_documentation",
generateDocumentation);
+ enableCaseInsensitiveEnums();
+ }
+
+ /**
+ * Accept enum flag values regardless of case across the root command and
all subcommands. The
+ * V5 client enums are uppercase (LATEST, EARLIEST, FAIL, ...) while users
have long passed the
+ * mixed-case v4 spellings (Latest, Earliest, ...); case-insensitive
parsing keeps that flag UX
+ * working. Picocli's {@code setCaseInsensitiveEnumValuesAllowed} does not
propagate to
+ * subcommands, so it must be applied to each command explicitly.
+ */
+ private void enableCaseInsensitiveEnums() {
+ applyCaseInsensitiveEnums(commander);
+ }
+
+ private static void applyCaseInsensitiveEnums(CommandLine cmd) {
+ cmd.setCaseInsensitiveEnumValuesAllowed(true);
+
cmd.getSubcommands().values().forEach(PulsarClientTool::applyCaseInsensitiveEnums);
}
protected void addCommand(String name, Object cmd) {
commander.addSubcommand(name, cmd);
+ enableCaseInsensitiveEnums();
}
private int updateConfig() throws UnsupportedAuthenticationException {
- Map<String, Object> conf = new HashMap<>();
Properties properties = pulsarClientPropertiesProvider.getProperties();
- for (String key : properties.stringPropertyNames()) {
- conf.put(key, properties.getProperty(key));
- }
- ClientBuilder clientBuilder = PulsarClient.builder().loadConf(conf)
- .memoryLimit(rootParams.memoryLimit, SizeUnit.BYTES);
+ PulsarClientBuilder clientBuilder = PulsarClient.builder()
+ .memoryLimit(MemorySize.ofBytes(rootParams.memoryLimit));
+
+ // The v4 Authentication object is still needed by the WebSocket
produce/consume path,
+ // which talks HTTP and is not migrated to the binary-only V5 client.
Authentication authentication = null;
if (isNotBlank(this.rootParams.authPluginClassName)) {
authentication =
AuthenticationFactory.create(rootParams.authPluginClassName,
rootParams.authParams);
- clientBuilder.authentication(authentication);
+ try {
+ clientBuilder.authentication(rootParams.authPluginClassName,
rootParams.authParams);
+ } catch (org.apache.pulsar.client.api.v5.PulsarClientException e) {
+ throw new UnsupportedAuthenticationException(e);
+ }
}
if (isNotBlank(this.rootParams.listenerName)) {
clientBuilder.listenerName(this.rootParams.listenerName);
}
- clientBuilder.serviceUrl(rootParams.serviceURL);
-
clientBuilder.tlsTrustCertsFilePath(this.rootParams.tlsTrustCertsFilePath);
+
+ // serviceUrl is only set on the V5 (binary) client for pulsar:// /
pulsar+ssl:// URLs.
+ // A ws:// URL means the WebSocket path is used instead, which never
builds a V5 client,
+ // and the V5 builder rejects non-broker schemes at configure time.
+ String serviceUrl = rootParams.serviceURL;
+ if (serviceUrl != null
+ && (serviceUrl.startsWith("pulsar://") ||
serviceUrl.startsWith("pulsar+ssl://"))) {
+ clientBuilder.serviceUrl(serviceUrl);
+ }
+
+ applyTlsPolicy(clientBuilder, serviceUrl, properties);
+
if (isNotBlank(rootParams.proxyServiceURL)) {
if (rootParams.proxyProtocol == null) {
commander.getErr().println("proxy-protocol must be provided
with proxy-url");
return 1;
}
- clientBuilder.proxyServiceUrl(rootParams.proxyServiceURL,
rootParams.proxyProtocol);
+ clientBuilder.connectionPolicy(ConnectionPolicy.builder()
+ .proxy(rootParams.proxyServiceURL,
+
org.apache.pulsar.client.api.v5.config.ProxyProtocol.valueOf(
+ rootParams.proxyProtocol.name()))
+ .build());
}
this.produceCommand.updateConfig(clientBuilder, authentication,
this.rootParams.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication,
this.rootParams.serviceURL);
@@ -168,6 +203,46 @@ public class PulsarClientTool implements CommandHook {
return 0;
}
+ /**
+ * Translate the client.conf TLS settings onto the typed V5 {@link
TlsPolicy}. V5 has no
+ * untyped {@code loadConf}, so the conf-file keys that have no dedicated
CLI flag
+ * ({@code tlsAllowInsecureConnection}, {@code
tlsEnableHostnameVerification}, the mTLS
+ * cert/key paths) are read from the properties here.
+ *
+ * <p>TLS is enabled only when the service URL uses {@code pulsar+ssl://}
or the conf sets
+ * {@code useTls=true}; otherwise we leave the policy untouched so a
plaintext broker is not
+ * accidentally contacted over TLS (calling {@code tlsPolicy()} always
flips {@code useTls}
+ * on). Keystore TLS has no V5 equivalent and is reported as unsupported.
+ */
+ private void applyTlsPolicy(PulsarClientBuilder clientBuilder, String
serviceUrl, Properties properties) {
+ boolean tlsByUrl = serviceUrl != null &&
serviceUrl.startsWith("pulsar+ssl://");
+ boolean tlsByConf =
Boolean.parseBoolean(properties.getProperty("useTls", "false"));
+ if (!tlsByUrl && !tlsByConf) {
+ return;
+ }
+ if (Boolean.parseBoolean(properties.getProperty("useKeyStoreTls",
"false"))) {
+ commander.getErr().println("Warning: keystore TLS (useKeyStoreTls)
is not supported by the "
+ + "V5-based pulsar-client; PEM trust/cert/key settings are
used instead.");
+ }
+ TlsPolicy.Builder tls = TlsPolicy.builder()
+ .allowInsecureConnection(
+
Boolean.parseBoolean(properties.getProperty("tlsAllowInsecureConnection",
"false")))
+ .enableHostnameVerification(
+
Boolean.parseBoolean(properties.getProperty("tlsEnableHostnameVerification",
"false")));
+ if (isNotBlank(rootParams.tlsTrustCertsFilePath)) {
+ tls.trustCertsFilePath(rootParams.tlsTrustCertsFilePath);
+ }
+ String certFile = properties.getProperty("tlsCertificateFilePath");
+ if (isNotBlank(certFile)) {
+ tls.certificateFilePath(certFile);
+ }
+ String keyFile = properties.getProperty("tlsKeyFilePath");
+ if (isNotBlank(keyFile)) {
+ tls.keyFilePath(keyFile);
+ }
+ clientBuilder.tlsPolicy(tls.build());
+ }
+
public int run(String[] args) {
return commander.execute(args);
}
@@ -199,6 +274,7 @@ public class PulsarClientTool implements CommandHook {
commander.getCommandSpec().removeSubcommand("produce");
}
commander.addSubcommand("produce", this.produceCommand);
+ enableCaseInsensitiveEnums();
}
@VisibleForTesting
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
index 9834c49f248..0c49cde9539 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdConsume.java
@@ -20,7 +20,9 @@ package org.apache.pulsar.client.cli;
import static org.testng.Assert.assertEquals;
import java.lang.reflect.Field;
+import java.util.Properties;
import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
public class TestCmdConsume {
@@ -44,4 +46,25 @@ public class TestCmdConsume {
+
"?subscriptionType=Exclusive&subscriptionMode=Durable");
}
+ @DataProvider(name = "mixedCaseEnumArgs")
+ public Object[][] mixedCaseEnumArgs() {
+ // The V5 client enums are uppercase; the CLI must still accept the
mixed-case v4 spellings
+ // on the (sub)commands. picocli does not propagate case-insensitive
parsing to subcommands,
+ // so this guards the explicit per-command wiring in PulsarClientTool.
+ return new Object[][] {
+ {"-p", "Earliest"}, {"-p", "earliest"}, {"-p", "EARLIEST"}, {"-p",
"Latest"},
+ {"-t", "Exclusive"}, {"-t", "Shared"}, {"-t", "Failover"},
+ {"-m", "NonDurable"}, {"-ca", "DISCARD"},
+ };
+ }
+
+ @Test(dataProvider = "mixedCaseEnumArgs")
+ public void testCaseInsensitiveEnumFlags(String flag, String value) {
+ Properties properties = new Properties();
+ properties.setProperty("serviceUrl", "pulsar://localhost:6650");
+ PulsarClientTool tool = new PulsarClientTool(properties);
+ // Must not throw a picocli ParameterException for the mixed-case enum
value.
+ tool.getCommander().parseArgs("consume", "-s", "sub", flag, value,
+ "persistent://public/default/t");
+ }
}
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
index 1b918fbe8dd..c2ccc912ec2 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdProduce.java
@@ -20,21 +20,25 @@ package org.apache.pulsar.client.cli;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.expectThrows;
import java.util.Collections;
import java.util.List;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DecoderFactory;
-import org.apache.pulsar.client.api.Schema;
-import org.apache.pulsar.client.api.schema.KeyValueSchema;
-import org.apache.pulsar.common.schema.KeyValueEncodingType;
-import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestCmdProduce {
+ private static final String AVRO_DEF = "{\"type\":
\"record\",\"namespace\": \"com.example\","
+ + "\"name\": \"FullName\", \"fields\": [{ \"name\": \"a\",
\"type\": \"string\" },"
+ + "{ \"name\": \"b\", \"type\": \"int\" }]}";
+
CmdProduce cmdProduce;
@BeforeMethod
@@ -51,56 +55,42 @@ public class TestCmdProduce {
}
@Test
- public void testBuildSchema() {
- // default
- assertEquals(SchemaType.BYTES, CmdProduce.buildSchema("string",
"bytes",
-
CmdProduce.KEY_VALUE_ENCODING_TYPE_NOT_SET).getSchemaInfo().getType());
-
- // simple key value
- assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string",
"string",
- "separated").getSchemaInfo().getType());
- assertEquals(SchemaType.KEY_VALUE, CmdProduce.buildSchema("string",
"string",
- "inline").getSchemaInfo().getType());
-
- KeyValueSchema<?, ?> composite1 = (KeyValueSchema<?, ?>)
CmdProduce.buildSchema("string",
- "json:{\"type\": \"record\",\"namespace\":
\"com.example\",\"name\": \"FullName\", \"fields\":"
- + " [{ \"name\": \"a\", \"type\": \"string\" }]}",
- "inline");
- assertEquals(KeyValueEncodingType.INLINE,
composite1.getKeyValueEncodingType());
- assertEquals(SchemaType.STRING,
composite1.getKeySchema().getSchemaInfo().getType());
- assertEquals(SchemaType.JSON,
composite1.getValueSchema().getSchemaInfo().getType());
-
- KeyValueSchema<?, ?> composite2 = (KeyValueSchema<?, ?>)
CmdProduce.buildSchema(
- "json:{\"type\": \"record\",\"namespace\":
\"com.example\",\"name\": \"FullName\", \"fields"
- + "\": [{ \"name\": \"a\", \"type\": \"string\" }]}",
- "avro:{\"type\": \"record\",\"namespace\":
\"com.example\",\"name\": \"FullName\", \"fields\":"
- + " [{ \"name\": \"a\", \"type\": \"string\" }]}",
- "inline");
- assertEquals(KeyValueEncodingType.INLINE,
composite2.getKeyValueEncodingType());
- assertEquals(SchemaType.JSON,
composite2.getKeySchema().getSchemaInfo().getType());
- assertEquals(SchemaType.AVRO,
composite2.getValueSchema().getSchemaInfo().getType());
+ public void testBuildValueSchema() {
+ // bytes -> raw BYTES, no native Avro schema.
+ CmdProduce.ValueSchema bytes = CmdProduce.buildValueSchema("bytes");
+ assertEquals(bytes.schema().schemaInfo().type(), SchemaType.BYTES);
+ assertNull(bytes.avroNative());
+
+ // string -> AUTO_PRODUCE_BYTES wrapping string; no native Avro schema.
+ CmdProduce.ValueSchema string = CmdProduce.buildValueSchema("string");
+ assertNotNull(string.schema());
+ assertNull(string.avroNative());
+
+ // avro:<def> -> AUTO_PRODUCE_BYTES wrapping a generic Avro schema;
native Avro present.
+ CmdProduce.ValueSchema avro = CmdProduce.buildValueSchema("avro:" +
AVRO_DEF);
+ assertNotNull(avro.schema());
+ assertNotNull(avro.avroNative());
+
+ // json:<def> -> AUTO_PRODUCE_BYTES wrapping a generic JSON schema; no
native Avro schema.
+ CmdProduce.ValueSchema json = CmdProduce.buildValueSchema("json:" +
AVRO_DEF);
+ assertNotNull(json.schema());
+ assertNull(json.avroNative());
+
+ // unknown -> rejected.
+ expectThrows(IllegalArgumentException.class, () ->
CmdProduce.buildValueSchema("nope"));
}
@Test
public void generateAvroMessageBodies() throws Exception {
-
- Schema<?> schema = CmdProduce.buildSchema(
- null,
- "avro:{\"type\": \"record\",\"namespace\":
\"com.example\",\"name\": \"FullName\", \"fields\":"
- + " [{ \"name\": \"a\", \"type\": \"string\" },"
- + "{ \"name\": \"b\", \"type\": \"int\" }"
- + "]}",
- "");
+ CmdProduce.ValueSchema vs = CmdProduce.buildValueSchema("avro:" +
AVRO_DEF);
List<byte[]> bytes =
CmdProduce.generateMessageBodies(List.of("{\"a\":\"stringValue\",\"b\":123}"),
- Collections.emptyList(), schema);
+ Collections.emptyList(), vs.avroNative());
assertEquals(bytes.size(), 1);
- org.apache.avro.Schema avro = (org.apache.avro.Schema)
schema.getNativeSchema().get();
- GenericDatumReader<GenericRecord> reader = new
GenericDatumReader<>(avro);
+ GenericDatumReader<GenericRecord> reader = new
GenericDatumReader<>(vs.avroNative());
GenericRecord record = reader.read(null,
DecoderFactory.get().binaryDecoder(bytes.get(0), null));
assertEquals("stringValue", record.get("a").toString());
assertEquals(123, record.get("b"));
-
}
}
diff --git
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
index 45f508f25cb..d95476986a8 100644
---
a/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
+++
b/pulsar-client-tools/src/test/java/org/apache/pulsar/client/cli/TestCmdRead.java
@@ -19,11 +19,7 @@
package org.apache.pulsar.client.cli;
import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
import java.lang.reflect.Field;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.client.impl.MessageIdImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -31,11 +27,11 @@ public class TestCmdRead {
@DataProvider(name = "startMessageIds")
public Object[][] startMessageIds() {
+ // The V5-based reader (CheckpointConsumer) only accepts latest /
earliest; the
+ // <ledgerId>:<entryId> form is no longer supported, so it is not
exercised here.
return new Object[][] {
{ "latest", "latest" },
{ "earliest", "earliest" },
- { "10:0", "CAoQADAA" },
- { "10:1", "CAoQATAA" },
};
}
@@ -51,19 +47,4 @@ public class TestCmdRead {
assertEquals(cmdRead.getWebSocketReadUri(topicNameV2),
"ws://localhost:8080/ws/v2/reader/persistent/public/default/t2?messageId=" +
msgIdQueryParam);
}
-
- @Test
- public void testParseMessageId() {
- assertEquals(CmdRead.parseMessageId("latest"), MessageId.latest);
- assertEquals(CmdRead.parseMessageId("earliest"), MessageId.earliest);
- assertEquals(CmdRead.parseMessageId("20:-1"), new MessageIdImpl(20,
-1, -1));
- assertEquals(CmdRead.parseMessageId("30:0"), new MessageIdImpl(30, 0,
-1));
- try {
- CmdRead.parseMessageId("invalid");
- fail("Should fail to parse invalid message ID");
- } catch (Throwable t) {
- assertTrue(t instanceof IllegalArgumentException);
- }
- }
-
}
diff --git a/pulsar-client-v5/build.gradle.kts
b/pulsar-client-v5/build.gradle.kts
index 9371a8eae82..0a5ebda446d 100644
--- a/pulsar-client-v5/build.gradle.kts
+++ b/pulsar-client-v5/build.gradle.kts
@@ -28,7 +28,11 @@ dependencies {
implementation(libs.slf4j.api)
implementation(libs.slog)
implementation(libs.opentelemetry.api)
- implementation(libs.protobuf.java)
+ // protobuf-java is only needed at compile time (for the Schema.protobuf()
signature, which
+ // references com.google.protobuf.Message). Keeping it compileOnly — as
pulsar-client and
+ // pulsar-client-api-v5 do — avoids dragging protobuf into every
distribution that bundles the
+ // V5 client; callers that actually use protobuf schemas bring
protobuf-java themselves.
+ compileOnly(libs.protobuf.java)
implementation(libs.netty.handler)
implementation(libs.jackson.annotations)
compileOnly(libs.lombok)
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
index 7e8971a4ff9..0439f67b285 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/PulsarClientProviderV5.java
@@ -124,6 +124,18 @@ public final class PulsarClientProviderV5 implements
PulsarClientProvider {
return
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES());
}
+ @Override
+ public Schema<?>
genericSchema(org.apache.pulsar.client.api.v5.schema.SchemaInfo schemaInfo) {
+ var v4Info = SchemaAdapter.toV4SchemaInfo(schemaInfo);
+ return
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.generic(v4Info));
+ }
+
+ @Override
+ public Schema<byte[]> autoProduceBytesSchema(Schema<?> base) {
+ var v4Base = SchemaAdapter.toV4(base);
+ return
SchemaAdapter.toV5(org.apache.pulsar.client.api.Schema.AUTO_PRODUCE_BYTES(v4Base));
+ }
+
// --- Checkpoint ---
@Override
diff --git
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
index e12b1a85b74..910f23a5a0a 100644
---
a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
+++
b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/SchemaAdapter.java
@@ -54,6 +54,19 @@ final class SchemaAdapter {
return new V4SchemaWrapper<>(v5Schema);
}
+ /**
+ * Convert a v5 SchemaInfo into a v4 SchemaInfo. Used to build v4 generic
schemas from a
+ * v5-supplied definition. The schema-type enum names are identical across
v4 and v5.
+ */
+ static org.apache.pulsar.common.schema.SchemaInfo
toV4SchemaInfo(SchemaInfo v5Info) {
+ return SchemaInfoImpl.builder()
+ .name(v5Info.name())
+
.type(org.apache.pulsar.common.schema.SchemaType.valueOf(v5Info.type().name()))
+ .schema(v5Info.schema())
+ .properties(v5Info.properties())
+ .build();
+ }
+
/**
* Wraps a v4 Schema as a v5 Schema.
*/
diff --git
a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
new file mode 100644
index 00000000000..db4cc2b6ec6
--- /dev/null
+++
b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/SchemaFactoryTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.v5;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import java.nio.charset.StandardCharsets;
+import org.apache.pulsar.client.api.v5.schema.Schema;
+import org.apache.pulsar.client.api.v5.schema.SchemaInfo;
+import org.apache.pulsar.client.api.v5.schema.SchemaType;
+import org.testng.annotations.Test;
+
+/**
+ * Covers the V5 Schema factories added for the pulsar-client CLI migration:
+ * {@link Schema#generic(SchemaInfo)} and {@link
Schema#autoProduceBytesOf(Schema)}, plus the
+ * {@link SchemaInfo#of} builder they consume.
+ */
+public class SchemaFactoryTest {
+
+ private static final String AVRO_DEF =
+
"{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"name\":\"a\",\"type\":\"string\"}]}";
+
+ @Test
+ public void testSchemaInfoOfRoundTrips() {
+ SchemaInfo info = SchemaInfo.of("client", SchemaType.AVRO,
+ AVRO_DEF.getBytes(StandardCharsets.UTF_8), null);
+ assertEquals(info.name(), "client");
+ assertEquals(info.type(), SchemaType.AVRO);
+ assertEquals(new String(info.schema(), StandardCharsets.UTF_8),
AVRO_DEF);
+ assertNotNull(info.properties());
+ assertEquals(info.properties().size(), 0);
+ }
+
+ @Test
+ public void testGenericSchemaFromAvroDefinition() {
+ SchemaInfo info = SchemaInfo.of("client", SchemaType.AVRO,
+ AVRO_DEF.getBytes(StandardCharsets.UTF_8), null);
+ Schema<?> generic = Schema.generic(info);
+ assertNotNull(generic);
+ assertEquals(generic.schemaInfo().type(), SchemaType.AVRO);
+ }
+
+ @Test
+ public void testAutoProduceBytesOfWrapsBase() {
+ // The CLI wraps a typed base schema so pre-encoded bytes are
validated against it.
+ Schema<byte[]> wrapped = Schema.autoProduceBytesOf(Schema.string());
+ assertNotNull(wrapped);
+ // AUTO_PRODUCE_BYTES encodes raw bytes straight through.
+ byte[] payload = "hello".getBytes(StandardCharsets.UTF_8);
+ assertEquals(wrapped.encode(payload), payload);
+ }
+}