Yunyung commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2251888324
##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -77,21 +87,35 @@ static int mainNoExit(String... args) {
}
// Visible for testing
- static void execute(String... args) throws Exception {
- if (args.length != 5 && args.length != 6) {
- throw new TerseException("USAGE: java " +
EndToEndLatency.class.getName()
- + " broker_list topic num_messages producer_acks
message_size_bytes [optional] properties_file");
- }
+ static void execute(String[] args) throws Exception {
+ String[] processedArgs = convertLegacyArgsIfNeeded(args);
+ EndToEndLatencyCommandOptions opts = new
EndToEndLatencyCommandOptions(processedArgs);
- String brokers = args[0];
- String topic = args[1];
- int numMessages = Integer.parseInt(args[2]);
- String acks = args[3];
- int messageSizeBytes = Integer.parseInt(args[4]);
- Optional<String> propertiesFile = (args.length > 5 &&
!Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
+ //required
Review Comment:
Nit:
```suggestion
// required
```
##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -42,16 +49,19 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+
/**
* This class records the average end to end latency for a single message to
travel through Kafka.
* Following are the required arguments
- * <p> broker_list = location of the bootstrap broker for both the producer
and the consumer </p>
- * <p> topic = topic name used by both the producer and the consumer to
send/receive messages </p>
- * <p> num_messages = # messages to send </p>
- * <p> producer_acks = See ProducerConfig.ACKS_DOC </p>
- * <p> message_size_bytes = size of each message in bytes </p>
+ * <p> --bootstrap-server = location of the bootstrap broker for both the
producer and the consumer </p>
+ * <p> --topic = topic name used by both the producer and the consumer to
send/receive messages </p>
+ * <p> --num-records = # messages to send </p>
+ * <p> --producer-acks = See ProducerConfig.ACKS_DOC </p>
+ * <p> --record-size = size of each message value in bytes </p>
*
- * <p> e.g. [localhost:9092 test 10000 1 20] </p>
+ * <p> e.g. [/bin/kafka-e2e-latency.sh --bootstrap-server localhost:9092
--topic test-topic --num-records 1000 --producer-acks 1 --record-size 512] </p>
Review Comment:
```suggestion
* <p> e.g. [./bin/kafka-e2e-latency.sh --bootstrap-server localhost:9092
--topic test-topic --num-records 1000 --producer-acks 1 --record-size 512] </p>
```
##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -42,16 +49,19 @@
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import joptsimple.OptionException;
+import joptsimple.OptionSpec;
+
/**
* This class records the average end to end latency for a single message to
travel through Kafka.
* Following are the required arguments
- * <p> broker_list = location of the bootstrap broker for both the producer
and the consumer </p>
- * <p> topic = topic name used by both the producer and the consumer to
send/receive messages </p>
- * <p> num_messages = # messages to send </p>
- * <p> producer_acks = See ProducerConfig.ACKS_DOC </p>
- * <p> message_size_bytes = size of each message in bytes </p>
+ * <p> --bootstrap-server = location of the bootstrap broker for both the
producer and the consumer </p>
Review Comment:
Nit:
Java docs do not need `</p>`
##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +303,156 @@ private static KafkaProducer<byte[], byte[]>
createKafkaProducer(Optional<String
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(producerProps);
}
+
+ // Visible for testing
+ static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
Review Comment:
Should we add Deprecated tag?
##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +303,156 @@ private static KafkaProducer<byte[], byte[]>
createKafkaProducer(Optional<String
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(producerProps);
}
+
+ // Visible for testing
+ static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+ if (args.length == 0) {
+ return args;
+ }
+
+ boolean hasNamedArgs = Arrays.stream(args).anyMatch(arg ->
arg.startsWith("--"));
+ if (hasNamedArgs) {
+ return args;
+ }
+
+ if (args.length != 5 && args.length != 6) {
+ throw new TerseException("Invalid number of arguments. Expected 5
or 6 positional arguments, but got " + args.length + ". " +
+ "Usage: bootstrap-server topic num-records producer-acks
record-size [optional] command-config");
+ }
+
+ return convertLegacyArgs(args);
+ }
+
+ private static String[] convertLegacyArgs(String[] legacyArgs) {
+ List<String> newArgs = new ArrayList<>();
+
+ // broker_list -> --bootstrap-server
+ newArgs.add("--bootstrap-server");
+ newArgs.add(legacyArgs[0]);
+
+ // topic -> --topic
+ newArgs.add("--topic");
+ newArgs.add(legacyArgs[1]);
+
+ // num_messages -> --num-records
+ newArgs.add("--num-records");
+ newArgs.add(legacyArgs[2]);
+
+ // producer_acks -> --producer-acks
+ newArgs.add("--producer-acks");
+ newArgs.add(legacyArgs[3]);
+
+ // message_size_bytes -> --record-size
+ newArgs.add("--record-size");
+ newArgs.add(legacyArgs[4]);
+
+ // properties_file -> --command-config
+ if (legacyArgs.length == 6 && !legacyArgs[5].trim().isEmpty()) {
+ newArgs.add("--command-config");
+ newArgs.add(legacyArgs[5]);
+ }
+ System.out.println("WARNING: Positional argument usage is deprecated
and will be removed in Apache Kafka 5.0. " +
+ "Please use named arguments instead: --bootstrap-server,
--topic, --num-records, --producer-acks, --record-size, --command-config");
+ return newArgs.toArray(new String[0]);
+ }
+
+ public static final class EndToEndLatencyCommandOptions extends
CommandDefaultOptions {
+ final OptionSpec<String> bootstrapServerOpt;
+ final OptionSpec<String> topicOpt;
+ final OptionSpec<Integer> numRecordsOpt;
+ final OptionSpec<String> acksOpt;
+ final OptionSpec<Integer> recordSizeOpt;
+ final OptionSpec<String> commandConfigOpt;
+ final OptionSpec<Integer> recordKeyOpt;
+ final OptionSpec<Integer> recordHeaderValueSizeOpt;
+ final OptionSpec<Integer> recordHeaderKeySizeOpt;
+ final OptionSpec<Integer> numHeadersOpt;
+
+ public EndToEndLatencyCommandOptions(String[] args) {
+ super(args);
+
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED:
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+ .withRequiredArg()
+ .describedAs("bootstrap-server")
+ .ofType(String.class);
+ topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for
the test.")
+ .withRequiredArg()
+ .describedAs("topic-name")
+ .ofType(String.class);
+ numRecordsOpt = parser.accepts("num-records", "REQUIRED: The
number of messages to send.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Integer.class);
+ acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer
acknowledgements. Must be '1' or 'all'.")
+ .withRequiredArg()
+ .describedAs("producer-acks")
+ .ofType(String.class);
+ recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size
of each message payload in bytes.")
+ .withRequiredArg()
+ .describedAs("bytes")
+ .ofType(Integer.class);
+ recordKeyOpt = parser.accepts("record-key-size", "Optional: The
size of the message key in bytes. If not set, messages are sent without a key.")
+ .withOptionalArg()
+ .describedAs("bytes")
+ .ofType(Integer.class);
+ recordHeaderKeySizeOpt = parser.accepts("record-header-key-size",
"Optional: The size of the message header key in bytes. Used together with
record-header-size.")
+ .withOptionalArg()
+ .describedAs("bytes")
+ .ofType(Integer.class);
+ recordHeaderValueSizeOpt = parser.accepts("record-header-size",
"Optional: The size of message header value in bytes.")
+ .withOptionalArg()
+ .describedAs("bytes")
+ .ofType(Integer.class);
+ numHeadersOpt = parser.accepts("num-headers", "Optional: The
number of headers to include in each message.")
+ .withOptionalArg()
+ .describedAs("count")
+ .ofType(Integer.class)
+ .defaultsTo(1);
+ commandConfigOpt = parser.accepts("command-config", "Optional: A
property file for Kafka producer/consumer/admin client configuration.")
+ .withOptionalArg()
+ .describedAs("config-file")
+ .ofType(String.class);
+
+ try {
+ options = parser.parse(args);
+ } catch (OptionException e) {
+ CommandLineUtils.printUsageAndExit(parser, e.getMessage());
+ }
+ checkArgs();
+ }
+
+ void checkArgs() {
+ CommandLineUtils.maybePrintHelpOrVersion(this, "This tool measures
end-to-end latency in Kafka by sending messages and timing their reception.");
+
+ // check required arguments
+ CommandLineUtils.checkRequiredArgs(parser, options,
bootstrapServerOpt, topicOpt, numRecordsOpt, acksOpt, recordSizeOpt);
+
+ // validate 'producer-acks'
+ String acksValue = options.valueOf(acksOpt);
+ if (!List.of("1", "all").contains(acksValue)) {
Review Comment:
How about `-1`?
##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -77,21 +87,35 @@ static int mainNoExit(String... args) {
}
// Visible for testing
- static void execute(String... args) throws Exception {
- if (args.length != 5 && args.length != 6) {
- throw new TerseException("USAGE: java " +
EndToEndLatency.class.getName()
- + " broker_list topic num_messages producer_acks
message_size_bytes [optional] properties_file");
- }
+ static void execute(String[] args) throws Exception {
+ String[] processedArgs = convertLegacyArgsIfNeeded(args);
+ EndToEndLatencyCommandOptions opts = new
EndToEndLatencyCommandOptions(processedArgs);
- String brokers = args[0];
- String topic = args[1];
- int numMessages = Integer.parseInt(args[2]);
- String acks = args[3];
- int messageSizeBytes = Integer.parseInt(args[4]);
- Optional<String> propertiesFile = (args.length > 5 &&
!Utils.isBlank(args[5])) ? Optional.of(args[5]) : Optional.empty();
+ //required
+ String brokers = opts.options.valueOf(opts.bootstrapServerOpt);
+ String topic = opts.options.valueOf(opts.topicOpt);
+ int numRecords = opts.options.valueOf(opts.numRecordsOpt);
+ String acks = opts.options.valueOf(opts.acksOpt);
+ int recordValueSize = opts.options.valueOf(opts.recordSizeOpt);
- if (!List.of("1", "all").contains(acks)) {
- throw new IllegalArgumentException("Latency testing requires
synchronous acknowledgement. Please use 1 or all");
+ //optional
Review Comment:
Nit
```suggestion
// optional
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]