This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 36270cce874 [improve][test] pulsar-perf consume: switch between V5 
Queue and Stream consumer (#25981)
36270cce874 is described below

commit 36270cce874f209a8d9a2abdace68960049dc0de
Author: Matteo Merli <[email protected]>
AuthorDate: Tue Jun 9 00:04:35 2026 -0700

    [improve][test] pulsar-perf consume: switch between V5 Queue and Stream 
consumer (#25981)
---
 .../pulsar/testclient/PerformanceConsumer.java     | 178 ++++++++++++++++-----
 .../testclient/PerformanceConsumerArgsTest.java    |  53 ++++++
 2 files changed, 187 insertions(+), 44 deletions(-)

diff --git 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
index 6c0f101edd8..448e668ffc2 100644
--- 
a/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
+++ 
b/pulsar-testclient/src/main/java/org/apache/pulsar/testclient/PerformanceConsumer.java
@@ -30,9 +30,9 @@ import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,10 +43,13 @@ import org.HdrHistogram.Histogram;
 import org.HdrHistogram.HistogramLogWriter;
 import org.HdrHistogram.Recorder;
 import org.apache.pulsar.client.api.v5.Message;
+import org.apache.pulsar.client.api.v5.MessageId;
 import org.apache.pulsar.client.api.v5.PulsarClient;
 import org.apache.pulsar.client.api.v5.PulsarClientBuilder;
 import org.apache.pulsar.client.api.v5.QueueConsumer;
 import org.apache.pulsar.client.api.v5.QueueConsumerBuilder;
+import org.apache.pulsar.client.api.v5.StreamConsumer;
+import org.apache.pulsar.client.api.v5.StreamConsumerBuilder;
 import org.apache.pulsar.client.api.v5.Transaction;
 import org.apache.pulsar.client.api.v5.auth.PemFileKeyProvider;
 import org.apache.pulsar.client.api.v5.config.ConsumerEncryptionPolicy;
@@ -75,6 +78,18 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
         Key_Shared
     }
 
+    /**
+     * Which V5 scalable-topic consumer API to drive. {@code Queue} gives 
unordered,
+     * individually-acked work distribution; {@code Stream} gives ordered, 
cumulatively-acked
+     * consumption with broker-coordinated 1:1 segment-to-consumer assignment. 
Switching to
+     * {@code Stream} with more consumers than segments is the handle for 
exercising the
+     * auto-split feature (PIP-483).
+     */
+    public enum ScalableConsumerType {
+        Queue,
+        Stream
+    }
+
     private static final LongAdder messagesReceived = new LongAdder();
     private static final LongAdder bytesReceived = new LongAdder();
 
@@ -117,6 +132,12 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
     @Option(names = { "-st", "--subscription-type" }, description = 
"Subscription type")
     public SubscriptionType subscriptionType = SubscriptionType.Exclusive;
 
+    @Option(names = { "-sct", "--scalable-consumer-type" },
+            description = "V5 scalable-topic consumer API to use: Queue 
(unordered, individual ack) "
+                    + "or Stream (ordered, cumulative ack, 1:1 segment 
assignment). Use Stream with "
+                    + "more consumers than segments to drive auto-split 
(PIP-483).")
+    public ScalableConsumerType scalableConsumerType = 
ScalableConsumerType.Queue;
+
     @Option(names = { "-sp", "--subscription-position" }, description = 
"Subscription position")
     private SubscriptionInitialPosition subscriptionInitialPosition = 
SubscriptionInitialPosition.LATEST;
 
@@ -297,60 +318,28 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
         Semaphore messageReceiveLimiter = new 
Semaphore(this.numMessagesPerTransaction);
         Thread thread = Thread.currentThread();
 
-        QueueConsumerBuilder<byte[]> consumerBuilder = 
pulsarClient.newQueueConsumer(Schema.bytes())
-                .receiverQueueSize(this.receiverQueueSize)
-                
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
-                .subscriptionInitialPosition(this.subscriptionInitialPosition);
-
-        if (isNotBlank(this.encKeyFile)) {
-            // We do not know the key name from --encryption-key-value-file 
alone; PemFileKeyProvider
-            // expects a name → path mapping. The encryption test path uses 
subscribers that name keys
-            // explicitly; here we register the file under the same name the 
producer side used
-            // (defaults to the file path's last component if unset upstream).
-            String keyName = Path.of(this.encKeyFile).getFileName().toString();
-            PemFileKeyProvider keys = PemFileKeyProvider.builder()
-                    .privateKey(keyName, Path.of(this.encKeyFile))
-                    .build();
-            consumerBuilder.encryptionPolicy(ConsumerEncryptionPolicy.builder()
-                    .privateKeyProvider(keys)
-                    .build());
-        }
+        final ConsumerEncryptionPolicy encryptionPolicy = 
buildEncryptionPolicyOrNull();
 
-        List<Future<QueueConsumer<byte[]>>> futures = new ArrayList<>();
+        List<CompletableFuture<PerfConsumer>> futures = new ArrayList<>();
         for (int i = 0; i < this.numTopics; i++) {
             final TopicName topicName = TopicName.get(this.topics.get(i));
 
             log.info()
                     .attr("adding", this.numConsumers)
                     .attr("topic", topicName)
+                    .attr("consumerType", this.scalableConsumerType)
                     .log("Adding consumers per subscription on topic");
 
             for (int j = 0; j < this.numSubscriptions; j++) {
                 String subscriberName = this.subscriptions.get(j);
                 for (int k = 0; k < this.numConsumers; k++) {
-                    // V5 QueueConsumerBuilder has no clone(); build 
per-consumer to set topic+sub.
-                    QueueConsumerBuilder<byte[]> b = 
pulsarClient.newQueueConsumer(Schema.bytes())
-                            .receiverQueueSize(this.receiverQueueSize)
-                            
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
-                            
.subscriptionInitialPosition(this.subscriptionInitialPosition)
-                            
.replicateSubscriptionState(this.replicatedSubscription)
-                            .topic(topicName.toString())
-                            .subscriptionName(subscriberName);
-                    if (isNotBlank(this.encKeyFile)) {
-                        String keyName = 
Path.of(this.encKeyFile).getFileName().toString();
-                        PemFileKeyProvider keys = PemFileKeyProvider.builder()
-                                .privateKey(keyName, Path.of(this.encKeyFile))
-                                .build();
-                        b.encryptionPolicy(ConsumerEncryptionPolicy.builder()
-                                .privateKeyProvider(keys)
-                                .build());
-                    }
-                    futures.add(b.subscribeAsync());
+                    futures.add(subscribeAsync(pulsarClient, 
topicName.toString(), subscriberName,
+                            encryptionPolicy));
                 }
             }
         }
-        final List<QueueConsumer<byte[]>> consumers = new 
ArrayList<>(futures.size());
-        for (Future<QueueConsumer<byte[]>> future : futures) {
+        final List<PerfConsumer> consumers = new ArrayList<>(futures.size());
+        for (CompletableFuture<PerfConsumer> future : futures) {
             consumers.add(future.get());
         }
 
@@ -359,7 +348,7 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
         // per consumer mirrors the v4 dispatch concurrency closely enough for 
the perf workload.
         ExecutorService consumerExec = Executors.newCachedThreadPool(
                 new DefaultThreadFactory("pulsar-perf-consumer-poll"));
-        for (QueueConsumer<byte[]> consumer : consumers) {
+        for (PerfConsumer consumer : consumers) {
             consumerExec.submit(() -> pollLoop(consumer, atomicReference, 
messageAckedCount,
                     messageReceiveLimiter, limiter, testEndTime, thread, 
pulsarClient));
         }
@@ -466,7 +455,7 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
      * dedicated thread that drives {@code receive(timeout)} and runs the same 
per-message
      * handler the v4 listener did (latency record, rate-limit, ack, 
transaction commit/rollover).
      */
-    private void pollLoop(QueueConsumer<byte[]> consumer,
+    private void pollLoop(PerfConsumer consumer,
                           AtomicReference<Transaction> atomicReference,
                           AtomicLong messageAckedCount,
                           Semaphore messageReceiveLimiter,
@@ -543,7 +532,7 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                 }
                 Transaction txn = atomicReference.get();
                 try {
-                    consumer.acknowledge(msg.id(), txn);
+                    consumer.ackTxn(msg.id(), txn);
                     totalMessageAck.increment();
                     messageAck.increment();
                 } catch (Exception e) {
@@ -556,7 +545,7 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
                 }
             } else {
                 try {
-                    consumer.acknowledge(msg.id());
+                    consumer.ack(msg.id());
                     totalMessageAck.increment();
                     messageAck.increment();
                 } catch (Exception e) {
@@ -628,6 +617,107 @@ public class PerformanceConsumer extends 
PerformanceTopicListArguments{
         }
     }
 
+    /**
+     * Minimal common view over the V5 {@link QueueConsumer} / {@link 
StreamConsumer} APIs so the
+     * poll loop is independent of which scalable-topic consumer type was 
selected. The ack methods
+     * map to {@code acknowledge} for Queue and {@code acknowledgeCumulative} 
for Stream.
+     */
+    private interface PerfConsumer {
+        Message<byte[]> receive(Duration timeout) throws Exception;
+
+        void ack(MessageId messageId) throws Exception;
+
+        void ackTxn(MessageId messageId, Transaction txn) throws Exception;
+    }
+
+    private CompletableFuture<PerfConsumer> subscribeAsync(PulsarClient 
client, String topic,
+                                                           String subscription,
+                                                           
ConsumerEncryptionPolicy encryptionPolicy) {
+        if (this.scalableConsumerType == ScalableConsumerType.Stream) {
+            // StreamConsumer has no receiverQueueSize knob; the rest carries 
over. Deliberately
+            // do NOT set a consumerName: the controller keys group membership 
by consumer name,
+            // so the V5 client's auto-generated unique name keeps every 
consumer — within one
+            // process and across separate `pulsar-perf consume` invocations — 
a distinct member.
+            // (Setting a deterministic name would make two processes collide 
and the second be
+            // treated as a reconnect of the first.)
+            StreamConsumerBuilder<byte[]> b = 
client.newStreamConsumer(Schema.bytes())
+                    
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
+                    
.subscriptionInitialPosition(this.subscriptionInitialPosition)
+                    .replicateSubscriptionState(this.replicatedSubscription)
+                    .topic(topic)
+                    .subscriptionName(subscription);
+            if (encryptionPolicy != null) {
+                b.encryptionPolicy(encryptionPolicy);
+            }
+            return b.subscribeAsync().thenApply(PerformanceConsumer::wrap);
+        }
+        QueueConsumerBuilder<byte[]> b = 
client.newQueueConsumer(Schema.bytes())
+                .receiverQueueSize(this.receiverQueueSize)
+                
.acknowledgmentGroupTime(Duration.ofMillis(this.acknowledgmentsGroupingDelayMillis))
+                .subscriptionInitialPosition(this.subscriptionInitialPosition)
+                .replicateSubscriptionState(this.replicatedSubscription)
+                .topic(topic)
+                .subscriptionName(subscription);
+        if (encryptionPolicy != null) {
+            b.encryptionPolicy(encryptionPolicy);
+        }
+        return b.subscribeAsync().thenApply(PerformanceConsumer::wrap);
+    }
+
+    private ConsumerEncryptionPolicy buildEncryptionPolicyOrNull() {
+        if (!isNotBlank(this.encKeyFile)) {
+            return null;
+        }
+        // We do not know the key name from --encryption-key-value-file alone; 
PemFileKeyProvider
+        // expects a name → path mapping. Register the file under the same 
name the producer side
+        // used (defaults to the file path's last component if unset upstream).
+        String keyName = Path.of(this.encKeyFile).getFileName().toString();
+        PemFileKeyProvider keys = PemFileKeyProvider.builder()
+                .privateKey(keyName, Path.of(this.encKeyFile))
+                .build();
+        return ConsumerEncryptionPolicy.builder()
+                .privateKeyProvider(keys)
+                .build();
+    }
+
+    private static PerfConsumer wrap(QueueConsumer<byte[]> consumer) {
+        return new PerfConsumer() {
+            @Override
+            public Message<byte[]> receive(Duration timeout) throws Exception {
+                return consumer.receive(timeout);
+            }
+
+            @Override
+            public void ack(MessageId messageId) throws Exception {
+                consumer.acknowledge(messageId);
+            }
+
+            @Override
+            public void ackTxn(MessageId messageId, Transaction txn) throws 
Exception {
+                consumer.acknowledge(messageId, txn);
+            }
+        };
+    }
+
+    private static PerfConsumer wrap(StreamConsumer<byte[]> consumer) {
+        return new PerfConsumer() {
+            @Override
+            public Message<byte[]> receive(Duration timeout) throws Exception {
+                return consumer.receive(timeout);
+            }
+
+            @Override
+            public void ack(MessageId messageId) throws Exception {
+                consumer.acknowledgeCumulative(messageId);
+            }
+
+            @Override
+            public void ackTxn(MessageId messageId, Transaction txn) throws 
Exception {
+                consumer.acknowledgeCumulative(messageId, txn);
+            }
+        };
+    }
+
     private void printAggregatedThroughput(long start) {
         double elapsed = (System.nanoTime() - start) / 1e9;
         double rate = totalMessagesReceived.sum() / elapsed;
diff --git 
a/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceConsumerArgsTest.java
 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceConsumerArgsTest.java
new file mode 100644
index 00000000000..0af41fe647f
--- /dev/null
+++ 
b/pulsar-testclient/src/test/java/org/apache/pulsar/testclient/PerformanceConsumerArgsTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.testclient;
+
+import static org.testng.Assert.assertEquals;
+import org.testng.annotations.Test;
+import picocli.CommandLine;
+
+public class PerformanceConsumerArgsTest {
+
+    private static PerformanceConsumer parse(String... args) {
+        PerformanceConsumer consumer = new PerformanceConsumer();
+        new CommandLine(consumer).parseArgs(args);
+        return consumer;
+    }
+
+    @Test
+    public void testScalableConsumerTypeDefaultsToQueue() {
+        PerformanceConsumer consumer = parse("my-topic");
+        assertEquals(consumer.scalableConsumerType,
+                PerformanceConsumer.ScalableConsumerType.Queue);
+    }
+
+    @Test
+    public void testScalableConsumerTypeStreamLongOption() {
+        PerformanceConsumer consumer = parse("--scalable-consumer-type", 
"Stream", "my-topic");
+        assertEquals(consumer.scalableConsumerType,
+                PerformanceConsumer.ScalableConsumerType.Stream);
+    }
+
+    @Test
+    public void testScalableConsumerTypeShortOption() {
+        PerformanceConsumer consumer = parse("-sct", "Queue", "my-topic");
+        assertEquals(consumer.scalableConsumerType,
+                PerformanceConsumer.ScalableConsumerType.Queue);
+    }
+}

Reply via email to