This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f97b95c60a2 KAFKA-19498 Add include argument to ConsumerPerformance
tool (#20221)
f97b95c60a2 is described below
commit f97b95c60a22cc34c6349bfe0c8596db28b0a17d
Author: Federico Valeri <[email protected]>
AuthorDate: Sun Aug 24 22:15:37 2025 +0200
KAFKA-19498 Add include argument to ConsumerPerformance tool (#20221)
This patch adds the include argument to ConsumerPerformance tool.
ConsoleConsumer and ConsumerPerformance serve different purposes but
share common functionality for message consumption. Currently, there's
an inconsistency in their command-line interfaces:
- ConsoleConsumer supports an --include argument that allows users to
specify a regular expression pattern to filter topics for consumption
- ConsumerPerformance lacks this topic filtering capability, requiring
users to specify a single topic explicitly via --topic argument
This inconsistency creates two problems:
- Similar tools should provide similar topic selection capabilities for
better user experience
- Users cannot test consumer performance across multiple topics or
dynamically matching topic sets, making it difficult to test realistic
scenarios
Reviewers: Chia-Ping Tsai <[email protected]>
---
docs/upgrade.html | 4 +
.../apache/kafka/server/util/CommandLineUtils.java | 25 +++++
.../kafka/server/util/CommandLineUtilsTest.java | 106 +++++++++++++++++++++
.../apache/kafka/tools/ConsumerPerformance.java | 35 +++++--
.../tools/consumer/ConsoleConsumerOptions.java | 8 +-
.../kafka/tools/ConsumerPerformanceTest.java | 43 ++++++++-
6 files changed, 206 insertions(+), 15 deletions(-)
diff --git a/docs/upgrade.html b/docs/upgrade.html
index f0f3f540738..1dbb7e2d2ee 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -40,6 +40,10 @@
<li>
The <code>PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG</code> in
<code>ProducerConfig</code> was deprecated and will be removed in Kafka 5.0.
Please use the <code>PARTITIONER_ADAPTIVE_PARTITIONING_ENABLE_CONFIG</code>
instead.
</li>
+ <li>
+ The <code>ConsumerPerformance</code> command line tool has a new
<code>include</code> option that is alternative to the <code>topic</code>
option.
+ This new option allows to pass a regular expression specifying a list
of topics to include for consumption, which is useful to test consumer
performance across multiple topics or dynamically matching topic sets.
+ </li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
index 6146daeb45c..c5b973f78e7 100644
---
a/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
+++
b/server-common/src/main/java/org/apache/kafka/server/util/CommandLineUtils.java
@@ -23,10 +23,12 @@ import org.apache.kafka.common.utils.Exit;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.stream.Collectors;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
@@ -135,6 +137,29 @@ public class CommandLineUtils {
Exit.exit(1, message);
}
+ /**
+ * Check that exactly one of a set of mutually exclusive arguments is
present.
+ */
+ public static void checkOneOfArgs(OptionParser parser, OptionSet options,
OptionSpec<?>... optionSpecs) {
+ if (optionSpecs == null || optionSpecs.length == 0) {
+ throw new IllegalArgumentException("At least one option must be
provided");
+ }
+
+ int presentCount = 0;
+ for (OptionSpec<?> spec : optionSpecs) {
+ if (options.has(spec)) {
+ presentCount++;
+ }
+ }
+
+ if (presentCount != 1) {
+ printUsageAndExit(parser, "Exactly one of the following arguments
is required: " +
+ Arrays.stream(optionSpecs)
+ .map(Object::toString)
+ .collect(Collectors.joining(", ")));
+ }
+ }
+
public static void printUsageAndExit(OptionParser parser, String message) {
System.err.println(message);
try {
diff --git
a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
index 8fdc6c89d06..a634b21403e 100644
---
a/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/util/CommandLineUtilsTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.util;
+import org.apache.kafka.common.utils.Exit;
+
import org.junit.jupiter.api.Test;
import java.util.List;
@@ -26,9 +28,12 @@ import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class CommandLineUtilsTest {
@Test
@@ -266,4 +271,105 @@ public class CommandLineUtilsTest {
() ->
CommandLineUtils.initializeBootstrapProperties(createTestProps(),
Optional.of("127.0.0.2:9094"),
Optional.of("127.0.0.3:9095"))).getMessage());
}
+
+ private OptionSpec<String> createMockOptionSpec(String name) {
+ OptionSpec<String> spec = mock(OptionSpec.class);
+ when(spec.toString()).thenReturn("[" + name.replaceAll("--", "") +
"]");
+ return spec;
+ }
+
+ @Test
+ void testCheckOneOfArgsNoOptions() {
+ OptionParser parser = mock(OptionParser.class);
+ OptionSet options = mock(OptionSet.class);
+
+ IllegalArgumentException e =
assertThrows(IllegalArgumentException.class, () ->
+ CommandLineUtils.checkOneOfArgs(parser, options)
+ );
+
+ assertEquals("At least one option must be provided", e.getMessage());
+ }
+
+ @Test
+ void testCheckOneOfArgsOnePresent() {
+ OptionParser parser = mock(OptionParser.class);
+ OptionSet options = mock(OptionSet.class);
+ OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
+ OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
+ OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
+
+ when(options.has(opt1)).thenReturn(true);
+ when(options.has(opt2)).thenReturn(false);
+ when(options.has(opt3)).thenReturn(false);
+
+ assertDoesNotThrow(() ->
+ CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2,
opt3)
+ );
+
+ when(options.has(opt1)).thenReturn(false);
+ when(options.has(opt2)).thenReturn(true);
+
+ assertDoesNotThrow(() ->
+ CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2,
opt3)
+ );
+
+ when(options.has(opt2)).thenReturn(false);
+ when(options.has(opt3)).thenReturn(true);
+
+ assertDoesNotThrow(() ->
+ CommandLineUtils.checkOneOfArgs(parser, options, opt1, opt2,
opt3)
+ );
+ }
+
+ @Test
+ void testCheckOneOfArgsNonePresent() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ OptionParser parser = mock(OptionParser.class);
+ OptionSet options = mock(OptionSet.class);
+ OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
+ OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
+ OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
+
+ when(options.has(opt1)).thenReturn(false);
+ when(options.has(opt2)).thenReturn(false);
+ when(options.has(opt3)).thenReturn(false);
+
+ try {
+ IllegalArgumentException e =
assertThrows(IllegalArgumentException.class,
+ () -> CommandLineUtils.checkOneOfArgs(parser, options,
opt1, opt2, opt3));
+ assertEquals("Exactly one of the following arguments is required:
" +
+ "[first-option], [second-option], [third-option]",
e.getMessage());
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ void testCheckOneOfArgsMultiplePresent() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ OptionParser parser = mock(OptionParser.class);
+ OptionSet options = mock(OptionSet.class);
+ OptionSpec<String> opt1 = createMockOptionSpec("--first-option");
+ OptionSpec<String> opt2 = createMockOptionSpec("--second-option");
+ OptionSpec<String> opt3 = createMockOptionSpec("--third-option");
+
+ when(options.has(opt1)).thenReturn(true);
+ when(options.has(opt2)).thenReturn(true);
+ when(options.has(opt3)).thenReturn(false);
+
+ try {
+ IllegalArgumentException e =
assertThrows(IllegalArgumentException.class,
+ () -> CommandLineUtils.checkOneOfArgs(parser, options,
opt1, opt2, opt3));
+ assertEquals("Exactly one of the following arguments is required:
" +
+ "[first-option], [second-option], [third-option]",
e.getMessage());
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
index 60b4b37abe4..0892693801a 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ConsumerPerformance.java
@@ -37,11 +37,13 @@ import java.io.IOException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Random;
-import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
import joptsimple.OptionException;
import joptsimple.OptionSpec;
@@ -134,8 +136,13 @@ public class ConsumerPerformance {
long reportingIntervalMs = options.reportingIntervalMs();
boolean showDetailedStats = options.showDetailedStats();
SimpleDateFormat dateFormat = options.dateFormat();
- consumer.subscribe(options.topic(),
- new ConsumerPerfRebListener(joinTimeMs, joinStartMs,
joinTimeMsInSingleRound));
+
+ ConsumerPerfRebListener listener = new
ConsumerPerfRebListener(joinTimeMs, joinStartMs, joinTimeMsInSingleRound);
+ if (options.topic().isPresent()) {
+ consumer.subscribe(options.topic().get(), listener);
+ } else {
+ consumer.subscribe(options.include().get(), listener);
+ }
// now start the benchmark
long currentTimeMs = System.currentTimeMillis();
@@ -246,6 +253,7 @@ public class ConsumerPerformance {
protected static class ConsumerPerfOptions extends CommandDefaultOptions {
private final OptionSpec<String> bootstrapServerOpt;
private final OptionSpec<String> topicOpt;
+ private final OptionSpec<String> includeOpt;
private final OptionSpec<String> groupIdOpt;
private final OptionSpec<Integer> fetchSizeOpt;
private final OptionSpec<Void> resetBeginningOffsetOpt;
@@ -265,10 +273,14 @@ public class ConsumerPerformance {
.withRequiredArg()
.describedAs("server to connect to")
.ofType(String.class);
- topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume
from.")
+ topicOpt = parser.accepts("topic", "The topic to consume from.")
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
+ includeOpt = parser.accepts("include", "Regular expression
specifying list of topics to include for consumption.")
+ .withRequiredArg()
+ .describedAs("Java regex (String)")
+ .ofType(String.class);
groupIdOpt = parser.accepts("group", "The group id to consume on.")
.withRequiredArg()
.describedAs("gid")
@@ -323,7 +335,8 @@ public class ConsumerPerformance {
}
if (options != null) {
CommandLineUtils.maybePrintHelpOrVersion(this, "This tool is
used to verify the consumer performance.");
- CommandLineUtils.checkRequiredArgs(parser, options, topicOpt,
numMessagesOpt);
+ CommandLineUtils.checkRequiredArgs(parser, options,
numMessagesOpt);
+ CommandLineUtils.checkOneOfArgs(parser, options, topicOpt,
includeOpt);
}
}
@@ -353,8 +366,16 @@ public class ConsumerPerformance {
return props;
}
- public Set<String> topic() {
- return Set.of(options.valueOf(topicOpt));
+ public Optional<Collection<String>> topic() {
+ return options.has(topicOpt)
+ ? Optional.of(List.of(options.valueOf(topicOpt)))
+ : Optional.empty();
+ }
+
+ public Optional<Pattern> include() {
+ return options.has(includeOpt)
+ ? Optional.of(Pattern.compile(options.valueOf(includeOpt)))
+ : Optional.empty();
}
public long numMessages() {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
index cf4daa0c636..60ee8f61ffa 100644
---
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
+++
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java
@@ -25,10 +25,8 @@ import org.apache.kafka.server.util.CommandDefaultOptions;
import org.apache.kafka.server.util.CommandLineUtils;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
@@ -185,12 +183,8 @@ public final class ConsoleConsumerOptions extends
CommandDefaultOptions {
}
private void checkRequiredArgs() {
- List<Optional<String>> topicOrFilterArgs = new
ArrayList<>(List.of(topicArg(), includedTopicsArg()));
- topicOrFilterArgs.removeIf(Optional::isEmpty);
// user need to specify value for either --topic or --include options
- if (topicOrFilterArgs.size() != 1) {
- CommandLineUtils.printUsageAndExit(parser, "Exactly one of
--include/--topic is required. ");
- }
+ CommandLineUtils.checkOneOfArgs(parser, options, topicOpt, includeOpt);
if (partitionArg().isPresent()) {
if (!options.has(topicOpt)) {
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
index 270fab2cf80..d78b65e54a3 100644
--- a/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/ConsumerPerformanceTest.java
@@ -75,7 +75,7 @@ public class ConsumerPerformanceTest {
ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
assertEquals("localhost:9092", config.brokerHostsAndPorts());
- assertTrue(config.topic().contains("test"));
+ assertTrue(config.topic().get().contains("test"));
assertEquals(10, config.numMessages());
}
@@ -93,6 +93,47 @@ public class ConsumerPerformanceTest {
assertTrue(err.contains("new-consumer is not a recognized option"));
}
+ @Test
+ public void testConfigWithInclude() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--include", "test.*",
+ "--messages", "10"
+ };
+
+ ConsumerPerformance.ConsumerPerfOptions config = new
ConsumerPerformance.ConsumerPerfOptions(args);
+
+ assertEquals("localhost:9092", config.brokerHostsAndPorts());
+ assertTrue(config.include().get().toString().contains("test.*"));
+ assertEquals(10, config.numMessages());
+ }
+
+ @Test
+ public void testConfigWithTopicAndInclude() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--include", "test.*",
+ "--messages", "10"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() -> new
ConsumerPerformance.ConsumerPerfOptions(args));
+
+ assertTrue(err.contains("Exactly one of the following arguments is
required: [topic], [include]"));
+ }
+
+ @Test
+ public void testConfigWithoutTopicAndInclude() {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--messages", "10"
+ };
+
+ String err = ToolsTestUtils.captureStandardErr(() -> new
ConsumerPerformance.ConsumerPerfOptions(args));
+
+ assertTrue(err.contains("Exactly one of the following arguments is
required: [topic], [include]"));
+ }
+
@Test
public void testClientIdOverride() throws IOException {
File tempFile =
Files.createFile(tempDir.resolve("test_consumer_config.conf")).toFile();