Rancho-7 commented on code in PR #20301:
URL: https://github.com/apache/kafka/pull/20301#discussion_r2298545145
##########
tools/src/main/java/org/apache/kafka/tools/EndToEndLatency.java:
##########
@@ -221,4 +290,173 @@ private static KafkaProducer<byte[], byte[]>
createKafkaProducer(Optional<String
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<>(producerProps);
}
+
+ /**
+ * Converts legacy positional arguments to named arguments for backward
compatibility.
+ *
+ * @param args the command line arguments to convert
+ * @return converted named arguments
+ * @throws Exception if the legacy arguments are invalid
+ * @deprecated Positional argument usage is deprecated and will be removed
in Apache Kafka 5.0.
+ * Use named arguments instead: --bootstrap-server, --topic,
--num-records, --producer-acks, --record-size, --command-config
+ */
+ @Deprecated(since = "4.2", forRemoval = true)
+ static String[] convertLegacyArgsIfNeeded(String[] args) throws Exception {
+ if (args.length == 0) {
+ return args;
+ }
+
+ boolean hasRequiredNamedArgs = Arrays.stream(args).anyMatch(arg ->
+ arg.equals("--bootstrap-server") ||
+ arg.equals("--topic") ||
+ arg.equals("--num-records") ||
+ arg.equals("--producer-acks") ||
+ arg.equals("--record-size"));
+ if (hasRequiredNamedArgs) {
+ return args;
+ }
+
+ if (args.length != 5 && args.length != 6) {
+ throw new TerseException("Invalid number of arguments. Expected 5
or 6 positional arguments, but got " + args.length + ". " +
+ "Usage: bootstrap-server topic num-records producer-acks
record-size [optional] command-config");
+ }
+
+ return convertLegacyArgs(args);
+ }
+
+ private static String[] convertLegacyArgs(String[] legacyArgs) {
+ List<String> newArgs = new ArrayList<>();
+
+ // broker_list -> --bootstrap-server
+ newArgs.add("--bootstrap-server");
+ newArgs.add(legacyArgs[0]);
+
+ // topic -> --topic
+ newArgs.add("--topic");
+ newArgs.add(legacyArgs[1]);
+
+ // num_messages -> --num-records
+ newArgs.add("--num-records");
+ newArgs.add(legacyArgs[2]);
+
+ // producer_acks -> --producer-acks
+ newArgs.add("--producer-acks");
+ newArgs.add(legacyArgs[3]);
+
+ // message_size_bytes -> --record-size
+ newArgs.add("--record-size");
+ newArgs.add(legacyArgs[4]);
+
+ // properties_file -> --command-config
+ if (legacyArgs.length == 6 && !legacyArgs[5].trim().isEmpty()) {
+ newArgs.add("--command-config");
+ newArgs.add(legacyArgs[5]);
+ }
+ System.out.println("WARNING: Positional argument usage is deprecated
and will be removed in Apache Kafka 5.0. " +
+ "Please use named arguments instead: --bootstrap-server,
--topic, --num-records, --producer-acks, --record-size, --command-config");
+ return newArgs.toArray(new String[0]);
+ }
+
+ public static final class EndToEndLatencyCommandOptions extends
CommandDefaultOptions {
+ final OptionSpec<String> bootstrapServerOpt;
+ final OptionSpec<String> topicOpt;
+ final OptionSpec<Integer> numRecordsOpt;
+ final OptionSpec<String> acksOpt;
+ final OptionSpec<Integer> recordSizeOpt;
+ final OptionSpec<String> commandConfigOpt;
+ final OptionSpec<Integer> recordKeyOpt;
+ final OptionSpec<Integer> recordHeaderValueSizeOpt;
+ final OptionSpec<Integer> recordHeaderKeySizeOpt;
+ final OptionSpec<Integer> numHeadersOpt;
+
+ public EndToEndLatencyCommandOptions(String[] args) {
+ super(args);
+
+ bootstrapServerOpt = parser.accepts("bootstrap-server", "REQUIRED:
The Kafka broker list string in the form HOST1:PORT1,HOST2:PORT2.")
+ .withRequiredArg()
+ .describedAs("bootstrap-server")
+ .ofType(String.class);
+ topicOpt = parser.accepts("topic", "REQUIRED: The topic to use for
the test.")
+ .withRequiredArg()
+ .describedAs("topic-name")
+ .ofType(String.class);
+ numRecordsOpt = parser.accepts("num-records", "REQUIRED: The
number of messages to send.")
+ .withRequiredArg()
+ .describedAs("count")
+ .ofType(Integer.class);
+ acksOpt = parser.accepts("producer-acks", "REQUIRED: Producer
acknowledgements. Must be '1' or 'all'.")
+ .withRequiredArg()
+ .describedAs("producer-acks")
+ .ofType(String.class);
+ recordSizeOpt = parser.accepts("record-size", "REQUIRED: The size
of each message payload in bytes.")
+ .withRequiredArg()
+ .describedAs("bytes")
+ .ofType(Integer.class);
+ recordKeyOpt = parser.accepts("record-key-size", "Optional: The
size of the message key in bytes. If not set, messages are sent without a key.")
+ .withOptionalArg()
+ .describedAs("bytes")
+ .ofType(Integer.class)
+ .defaultsTo(0);
+ recordHeaderKeySizeOpt = parser.accepts("record-header-key-size",
"Optional: The size of the message header key in bytes. Used together with
record-header-size.")
+ .withOptionalArg()
+ .describedAs("bytes")
+ .ofType(Integer.class)
+ .defaultsTo(0);
+ recordHeaderValueSizeOpt = parser.accepts("record-header-size",
"Optional: The size of message header value in bytes. Use -1 for null header
value.")
+ .withOptionalArg()
+ .describedAs("bytes")
+ .ofType(Integer.class)
+ .defaultsTo(0);
+ numHeadersOpt = parser.accepts("num-headers", "Optional: The
number of headers to include in each message.")
+ .withOptionalArg()
+ .describedAs("count")
+ .ofType(Integer.class)
+ .defaultsTo(1);
Review Comment:
The KIP mentions that the default value for `num-headers` is set to 1, but
it doesn't specify the default value for the other arguments. I will add that
information to the KIP. Thanks!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]