This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 36270cce874 [improve][test] pulsar-perf consume: switch between V5
Queue and Stream consumer (#25981)
36270cce874 is described below
commit 36270cce874f209a8d9a2abdace68960049dc0de
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jun 9 00:04:35 2026 -0700
[improve][test] pulsar-perf consume: switch between V5 Queue and Stream
consumer (#25981)
---
.../pulsar/testclient/PerformanceConsumer.java | 178 ++++++++++++++++-----
.../testclient/PerformanceConsumerArgsTest.java | 53 ++++++
2 files changed, 187 insertions(+), 44 deletions(-)
diff --git
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 6c0f101edd8..448e668ffc2 100644
---
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -30,9 +30,9 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,10 +43,13 @@ import org.HdrHistogram.Histogram;
import org.HdrHistogram.HistogramLogWriter;
import org.HdrHistogram.Recorder;
import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.MessageId;
import org.apache.pulsar.client.api.v5.PulsarClient;
import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
import org.apache.pulsar.client.api.v5.QueueConsumer;
import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
+import org.apache.pulsar.client.api.v5.StreamConsumer;
+import org.apache.pulsar.client.api.v5.StreamConsumerBuilder;
import org.apache.pulsar.client.api.v5.Transaction;
import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider;
import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
@@ -75,6 +78,18 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
Key_Shared
}
+ /**
+ * Which V5 scalable-topic consumer API to drive. {@code Queue} gives
unordered,
+ * individually-acked work distribution; {@code Stream} gives ordered,
cumulatively-acked
+ * consumption with broker-coordinated 1:1 segment-to-consumer assignment.
Switching to
+ * {@code Stream} with more consumers than segments is the handle for
exercising the
+ * auto-split feature (PIP-483).
+ */
+ public enum ScalableConsumerType {
+ Queue,
+ Stream
+ }
+
private static final LongAdder messagesReceived = new LongAdder();
private static final LongAdder bytesReceived = new LongAdder();
@@ -117,6 +132,12 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
@Option(names = { "-st", "--subscription-type" }, description =
"Subscription type")
public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
+ @Option(names = { "-sct", "--scalable-consumer-type" },
+ description = "V5 scalable-topic consumer API to use: Queue
(unordered, individual ack) "
+ + "or Stream (ordered, cumulative ack, 1:1 segment
assignment). Use Stream with "
+ + "more consumers than segments to drive auto-split
(PIP-483).")
+ public ScalableConsumerType scalableConsumerType =
ScalableConsumerType.Queue;
+
@Option(names = { "-sp", "--subscription-position" }, description =
"Subscription position")
private SubscriptionInitialPosition subscriptionInitialPosition =
SubscriptionInitialPosition.LATEST;
@@ -297,60 +318,28 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
Semaphore messageReceiveLimiter = new
Semaphore(this.numMessagesPerTransaction);
Thread thread = Thread.currentThread();
- QueueConsumerBuilder<byte[]> consumerBuilder =
pulsarClient.newQueueConsumer(Schema.bytes())
- .receiverQueueSize(this.receiverQueueSize)
-
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
- .subscriptionInitialPosition(this.subscriptionInitialPosition);
-
- if (isNotBlank(this.encKeyFile)) {
- // We do not know the key name from --encryption-key-value-file
alone; PemFileKeyProvider
- // expects a name → path mapping. The encryption test path uses
subscribers that name keys
- // explicitly; here we register the file under the same name the
producer side used
- // (defaults to the file path's last component if unset upstream).
- String keyName = Path.of(this.encKeyFile).getFileName().toString();
- PemFileKeyProvider keys = PemFileKeyProvider.builder()
- .privateKey(keyName, Path.of(this.encKeyFile))
- .build();
- consumerBuilder.encryptionPolicy(ConsumerEncryptionPolicy.builder()
- .privateKeyProvider(keys)
- .build());
- }
+ final ConsumerEncryptionPolicy encryptionPolicy =
buildEncryptionPolicyOrNull();
- List<Future<QueueConsumer<byte[]>>> futures = new ArrayList<>();
+ List<CompletableFuture<PerfConsumer>> futures = new ArrayList<>();
for (int i = 0; i < this.numTopics; i++) {
final TopicName topicName = TopicName.get(this.topics.get(i));
log.info()
.attr("adding", this.numConsumers)
.attr("topic", topicName)
+ .attr("consumerType", this.scalableConsumerType)
.log("Adding consumers per subscription on topic");
for (int j = 0; j < this.numSubscriptions; j++) {
String subscriberName = this.subscriptions.get(j);
for (int k = 0; k < this.numConsumers; k++) {
- // V5 QueueConsumerBuilder has no clone(); build
per-consumer to set topic+sub.
- QueueConsumerBuilder<byte[]> b =
pulsarClient.newQueueConsumer(Schema.bytes())
- .receiverQueueSize(this.receiverQueueSize)
-
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
-
.subscriptionInitialPosition(this.subscriptionInitialPosition)
-
.replicateSubscriptionState(this.replicatedSubscription)
- .topic(topicName.toString())
- .subscriptionName(subscriberName);
- if (isNotBlank(this.encKeyFile)) {
- String keyName =
Path.of(this.encKeyFile).getFileName().toString();
- PemFileKeyProvider keys = PemFileKeyProvider.builder()
- .privateKey(keyName, Path.of(this.encKeyFile))
- .build();
- b.encryptionPolicy(ConsumerEncryptionPolicy.builder()
- .privateKeyProvider(keys)
- .build());
- }
- futures.add(b.subscribeAsync());
+ futures.add(subscribeAsync(pulsarClient,
topicName.toString(), subscriberName,
+ encryptionPolicy));
}
}
}
- final List<QueueConsumer<byte[]>> consumers = new
ArrayList<>(futures.size());
- for (Future<QueueConsumer<byte[]>> future : futures) {
+ final List<PerfConsumer> consumers = new ArrayList<>(futures.size());
+ for (CompletableFuture<PerfConsumer> future : futures) {
consumers.add(future.get());
}
@@ -359,7 +348,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
// per consumer mirrors the v4 dispatch concurrency closely enough for
the perf workload.
ExecutorService consumerExec = Executors.newCachedThreadPool(
new DefaultThreadFactory("pulsar-perf-consumer-poll"));
- for (QueueConsumer<byte[]> consumer : consumers) {
+ for (PerfConsumer consumer : consumers) {
consumerExec.submit(() -> pollLoop(consumer, atomicReference,
messageAckedCount,
messageReceiveLimiter, limiter, testEndTime, thread,
pulsarClient));
}
@@ -466,7 +455,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
* dedicated thread that drives {@code receive(timeout)} and runs the same
per-message
* handler the v4 listener did (latency record, rate-limit, ack,
transaction commit/rollover).
*/
- private void pollLoop(QueueConsumer<byte[]> consumer,
+ private void pollLoop(PerfConsumer consumer,
AtomicReference<Transaction> atomicReference,
AtomicLong messageAckedCount,
Semaphore messageReceiveLimiter,
@@ -543,7 +532,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
}
Transaction txn = atomicReference.get();
try {
- consumer.acknowledge(msg.id(), txn);
+ consumer.ackTxn(msg.id(), txn);
totalMessageAck.increment();
messageAck.increment();
} catch (Exception e) {
@@ -556,7 +545,7 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
}
} else {
try {
- consumer.acknowledge(msg.id());
+ consumer.ack(msg.id());
totalMessageAck.increment();
messageAck.increment();
} catch (Exception e) {
@@ -628,6 +617,107 @@ public class PerformanceConsumer extends
PerformanceTopicListArguments{
}
}
+ /**
+ * Minimal common view over the V5 {@link QueueConsumer} / {@link
StreamConsumer} APIs so the
+ * poll loop is independent of which scalable-topic consumer type was
selected. The ack methods
+ * map to {@code acknowledge} for Queue and {@code acknowledgeCumulative}
for Stream.
+ */
+ private interface PerfConsumer {
+ Message<byte[]> receive(Duration timeout) throws Exception;
+
+ void ack(MessageId messageId) throws Exception;
+
+ void ackTxn(MessageId messageId, Transaction txn) throws Exception;
+ }
+
+ private CompletableFuture<PerfConsumer> subscribeAsync(PulsarClient
client, String topic,
+ String subscription,
+
ConsumerEncryptionPolicy encryptionPolicy) {
+ if (this.scalableConsumerType == ScalableConsumerType.Stream) {
+ // StreamConsumer has no receiverQueueSize knob; the rest carries
over. Deliberately
+ // do NOT set a consumerName: the controller keys group membership
by consumer name,
+ // so the V5 client's auto-generated unique name keeps every
consumer — within one
+ // process and across separate `pulsar-perf consume` invocations —
a distinct member.
+ // (Setting a deterministic name would make two processes collide
and the second be
+ // treated as a reconnect of the first.)
+ StreamConsumerBuilder<byte[]> b =
client.newStreamConsumer(Schema.bytes())
+
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
+
.subscriptionInitialPosition(this.subscriptionInitialPosition)
+ .replicateSubscriptionState(this.replicatedSubscription)
+ .topic(topic)
+ .subscriptionName(subscription);
+ if (encryptionPolicy != null) {
+ b.encryptionPolicy(encryptionPolicy);
+ }
+ return b.subscribeAsync().thenApply(PerformanceConsumer::wrap);
+ }
+ QueueConsumerBuilder<byte[]> b =
client.newQueueConsumer(Schema.bytes())
+ .receiverQueueSize(this.receiverQueueSize)
+
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
+ .subscriptionInitialPosition(this.subscriptionInitialPosition)
+ .replicateSubscriptionState(this.replicatedSubscription)
+ .topic(topic)
+ .subscriptionName(subscription);
+ if (encryptionPolicy != null) {
+ b.encryptionPolicy(encryptionPolicy);
+ }
+ return b.subscribeAsync().thenApply(PerformanceConsumer::wrap);
+ }
+
+ private ConsumerEncryptionPolicy buildEncryptionPolicyOrNull() {
+ if (!isNotBlank(this.encKeyFile)) {
+ return null;
+ }
+ // We do not know the key name from --encryption-key-value-file alone;
PemFileKeyProvider
+ // expects a name → path mapping. Register the file under the same
name the producer side
+ // used (defaults to the file path's last component if unset upstream).
+ String keyName = Path.of(this.encKeyFile).getFileName().toString();
+ PemFileKeyProvider keys = PemFileKeyProvider.builder()
+ .privateKey(keyName, Path.of(this.encKeyFile))
+ .build();
+ return ConsumerEncryptionPolicy.builder()
+ .privateKeyProvider(keys)
+ .build();
+ }
+
+ private static PerfConsumer wrap(QueueConsumer<byte[]> consumer) {
+ return new PerfConsumer() {
+ @Override
+ public Message<byte[]> receive(Duration timeout) throws Exception {
+ return consumer.receive(timeout);
+ }
+
+ @Override
+ public void ack(MessageId messageId) throws Exception {
+ consumer.acknowledge(messageId);
+ }
+
+ @Override
+ public void ackTxn(MessageId messageId, Transaction txn) throws
Exception {
+ consumer.acknowledge(messageId, txn);
+ }
+ };
+ }
+
+ private static PerfConsumer wrap(StreamConsumer<byte[]> consumer) {
+ return new PerfConsumer() {
+ @Override
+ public Message<byte[]> receive(Duration timeout) throws Exception {
+ return consumer.receive(timeout);
+ }
+
+ @Override
+ public void ack(MessageId messageId) throws Exception {
+ consumer.acknowledgeCumulative(messageId);
+ }
+
+ @Override
+ public void ackTxn(MessageId messageId, Transaction txn) throws
Exception {
+ consumer.acknowledgeCumulative(messageId, txn);
+ }
+ };
+ }
+
private void printAggregatedThroughput(long start) {
double elapsed = (System.nanoTime() - start) / 1e9;
double rate = totalMessagesReceived.sum() / elapsed;
diff --git
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceConsumerArgsTest.java
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceConsumerArgsTest.java
new file mode 100644
index 00000000000..0af41fe647f
--- /dev/null
+++
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceConsumerArgsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+import picocli.CommandLine;
+
+public class PerformanceConsumerArgsTest {
+
+ private static PerformanceConsumer parse(String... args) {
+ PerformanceConsumer consumer = new PerformanceConsumer();
+ new CommandLine(consumer).parseArgs(args);
+ return consumer;
+ }
+
+ @Test
+ public void testScalableConsumerTypeDefaultsToQueue() {
+ PerformanceConsumer consumer = parse("my-topic");
+ assertEquals(consumer.scalableConsumerType,
+ PerformanceConsumer.ScalableConsumerType.Queue);
+ }
+
+ @Test
+ public void testScalableConsumerTypeStreamLongOption() {
+ PerformanceConsumer consumer = parse("--scalable-consumer-type",
"Stream", "my-topic");
+ assertEquals(consumer.scalableConsumerType,
+ PerformanceConsumer.ScalableConsumerType.Stream);
+ }
+
+ @Test
+ public void testScalableConsumerTypeShortOption() {
+ PerformanceConsumer consumer = parse("-sct", "Queue", "my-topic");
+ assertEquals(consumer.scalableConsumerType,
+ PerformanceConsumer.ScalableConsumerType.Queue);
+ }
+}