This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a931f85835c KAFKA-19625: Consistency of command-line arguments for 
verifiable producer/consumer (#20390)
a931f85835c is described below

commit a931f85835c35fae650e02594bc06cb42dc33c90
Author: jimmy <[email protected]>
AuthorDate: Wed Aug 27 17:53:26 2025 +0800

    KAFKA-19625: Consistency of command-line arguments for verifiable 
producer/consumer (#20390)
    
    As described in
    [jira](https://issues.apache.org/jira/browse/KAFKA-19625), this PR
    implements replace `consumer.config` and `producer.config` with
    `command-config` for kafka-verifiable-producer.sh and
    kafka-verifiable-consumer.sh.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 tests/kafkatest/services/verifiable_client.py      |  6 +++--
 tests/kafkatest/services/verifiable_consumer.py    |  2 +-
 tests/kafkatest/services/verifiable_producer.py    |  2 +-
 .../org/apache/kafka/tools/VerifiableConsumer.java | 27 +++++++++++++++++++---
 .../org/apache/kafka/tools/VerifiableProducer.java | 27 +++++++++++++++++++---
 5 files changed, 54 insertions(+), 10 deletions(-)

diff --git a/tests/kafkatest/services/verifiable_client.py 
b/tests/kafkatest/services/verifiable_client.py
index 16617d621aa..b6c19267328 100644
--- a/tests/kafkatest/services/verifiable_client.py
+++ b/tests/kafkatest/services/verifiable_client.py
@@ -74,7 +74,8 @@ Command line arguments:
  * `--enable-autocommit`
  * `--max-messages <n>`
  * `--assignment-strategy <s>`
- * `--consumer.config <config-file>` - consumer config properties (typically 
empty)
+ * `--consumer.config <config-file>` - (DEPRECATED) consumer config properties 
(typically empty). This option will be removed in a future version. Use 
--command-config instead.
+ * `--command-config <config-file>` - command config properties
 
 Environment variables:
  * `LOG_DIR` - log output directory. Typically not needed if logs are written 
to stderr.
@@ -97,7 +98,8 @@ Command line arguments:
  * `--broker-list <brokers>`
  * `--max-messages <n>`
  * `--throughput <msgs/s>`
- * `--producer.config <config-file>` - producer config properties (typically 
empty)
+ * `--producer.config <config-file>` - producer config properties (typically 
empty). This option will be removed in a future version. Use --command-config 
instead.
+ * `--command-config <config-file>` - command config properties
 
 Environment variables:
  * `LOG_DIR` - log output directory. Typically not needed if logs are written 
to stderr.
diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index 8264566f1c2..99c403ce190 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -424,7 +424,7 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         if self.max_messages > 0:
             cmd += " --max-messages %s" % str(self.max_messages)
 
-        cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
+        cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
         cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE, 
VerifiableConsumer.STDOUT_CAPTURE)
         return cmd
 
diff --git a/tests/kafkatest/services/verifiable_producer.py 
b/tests/kafkatest/services/verifiable_producer.py
index e7a8f411f3a..049de4ce986 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -249,7 +249,7 @@ class VerifiableProducer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
         if self.repeating_keys is not None:
             cmd += " --repeating-keys %s " % str(self.repeating_keys)
 
-        cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
+        cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
 
         cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, 
VerifiableProducer.STDOUT_CAPTURE)
         return cmd
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 1fa1aed8c4f..1ecea5331dd 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -597,7 +597,7 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
                 .setDefault("earliest")
                 .type(String.class)
                 .dest("resetPolicy")
-                .help("Set reset policy (must be either 'earliest', 'latest', 
or 'none'");
+                .help("Set reset policy (must be either 'earliest', 'latest', 
or 'none')");
 
         parser.addArgument("--assignment-strategy")
                 .action(store())
@@ -611,8 +611,17 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
                 .action(store())
                 .required(false)
                 .type(String.class)
-                .metavar("CONFIG_FILE")
-                .help("Consumer config properties file (config options shared 
with command line parameters will be overridden).");
+                .metavar("CONFIG-FILE")
+                .help("(DEPRECATED) Consumer config properties file" +
+                        "This option will be removed in a future version. Use 
--command-config instead");
+
+        parser.addArgument("--command-config")
+                .action(store())
+                .required(false)
+                .type(String.class)
+                .metavar("CONFIG-FILE")
+                .dest("commandConfigFile")
+                .help("Config properties file (config options shared with 
command line parameters will be overridden).");
 
         return parser;
     }
@@ -622,16 +631,28 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
 
         boolean useAutoCommit = res.getBoolean("useAutoCommit");
         String configFile = res.getString("consumer.config");
+        String commandConfigFile = res.getString("commandConfigFile");
         String brokerHostAndPort = res.getString("bootstrapServer");
 
         Properties consumerProps = new Properties();
+        if (configFile != null && commandConfigFile != null) {
+            throw new ArgumentParserException("Options --consumer.config and 
--command-config are mutually exclusive.", parser);
+        }
         if (configFile != null) {
+            System.out.println("Option --consumer.config has been deprecated 
and will be removed in a future version. Use --command-config instead.");
             try {
                 consumerProps.putAll(Utils.loadProps(configFile));
             } catch (IOException e) {
                 throw new ArgumentParserException(e.getMessage(), parser);
             }
         }
+        if (commandConfigFile != null) {
+            try {
+                
consumerProps.putAll(Utils.loadProps(res.getString(commandConfigFile)));
+            } catch (IOException e) {
+                throw new ArgumentParserException(e.getMessage(), parser);
+            }
+        }
 
         GroupProtocol groupProtocol = 
GroupProtocol.of(res.getString("groupProtocol"));
         consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 33f0b3142e8..bfbbb8a4854 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -161,8 +161,9 @@ public class VerifiableProducer implements AutoCloseable {
                 .action(store())
                 .required(false)
                 .type(String.class)
-                .metavar("CONFIG_FILE")
-                .help("Producer config properties file.");
+                .metavar("CONFIG-FILE")
+                .help("(DEPRECATED) Producer config properties file. " +
+                        "This option will be removed in a future version. Use 
--command-config instead.");
 
         parser.addArgument("--message-create-time")
                 .action(store())
@@ -189,6 +190,14 @@ public class VerifiableProducer implements AutoCloseable {
             .dest("repeatingKeys")
             .help("If specified, each produced record will have a key starting 
at 0 increment by 1 up to the number specified (exclusive), then the key is set 
to 0 again");
 
+        parser.addArgument("--command-config")
+            .action(store())
+            .required(false)
+            .type(String.class)
+            .metavar("CONFIG-FILE")
+            .dest("commandConfigFile")
+            .help("Config properties file (config options shared with command 
line parameters will be overridden).");
+
         return parser;
     }
     
@@ -217,6 +226,7 @@ public class VerifiableProducer implements AutoCloseable {
         String topic = res.getString("topic");
         int throughput = res.getInt("throughput");
         String configFile = res.getString("producer.config");
+        String commandConfigFile = res.getString("commandConfigFile");
         Integer valuePrefix = res.getInt("valuePrefix");
         Long createTime = res.getLong("createTime");
         Integer repeatingKeys = res.getInt("repeatingKeys");
@@ -240,14 +250,25 @@ public class VerifiableProducer implements AutoCloseable {
         producerProps.put(ProducerConfig.ACKS_CONFIG, 
Integer.toString(res.getInt("acks")));
         // No producer retries
         producerProps.put(ProducerConfig.RETRIES_CONFIG, "0");
+        if (configFile != null && commandConfigFile != null) {
+            throw new ArgumentParserException("Options --producer.config and 
--command-config are mutually exclusive.", parser);
+        }
+
         if (configFile != null) {
+            System.out.println("Option --producer.config has been deprecated 
and will be removed in a future version. Use --command-config instead.");
             try {
                 producerProps.putAll(loadProps(configFile));
             } catch (IOException e) {
                 throw new ArgumentParserException(e.getMessage(), parser);
             }
         }
-
+        if (commandConfigFile != null) {
+            try {
+                producerProps.putAll(loadProps(commandConfigFile));
+            } catch (IOException e) {
+                throw new ArgumentParserException(e.getMessage(), parser);
+            }
+        }
         StringSerializer serializer = new StringSerializer();
         KafkaProducer<String, String> producer = new 
KafkaProducer<>(producerProps, serializer, serializer);
 

Reply via email to