This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new a931f85835c KAFKA-19625: Consistency of command-line arguments for
verifiable producer/consumer (#20390)
a931f85835c is described below
commit a931f85835c35fae650e02594bc06cb42dc33c90
Author: jimmy <[email protected]>
AuthorDate: Wed Aug 27 17:53:26 2025 +0800
KAFKA-19625: Consistency of command-line arguments for verifiable
producer/consumer (#20390)
As described in
[jira](https://issues.apache.org/jira/browse/KAFKA-19625), this PR
implements replace `consumer.config` and `producer.config` with
`command-config` for kafka-verifiable-producer.sh and
kafka-verifiable-consumer.sh.
Reviewers: Andrew Schofield <[email protected]>
---
tests/kafkatest/services/verifiable_client.py | 6 +++--
tests/kafkatest/services/verifiable_consumer.py | 2 +-
tests/kafkatest/services/verifiable_producer.py | 2 +-
.../org/apache/kafka/tools/VerifiableConsumer.java | 27 +++++++++++++++++++---
.../org/apache/kafka/tools/VerifiableProducer.java | 27 +++++++++++++++++++---
5 files changed, 54 insertions(+), 10 deletions(-)
diff --git a/tests/kafkatest/services/verifiable_client.py
b/tests/kafkatest/services/verifiable_client.py
index 16617d621aa..b6c19267328 100644
--- a/tests/kafkatest/services/verifiable_client.py
+++ b/tests/kafkatest/services/verifiable_client.py
@@ -74,7 +74,8 @@ Command line arguments:
* `--enable-autocommit`
* `--max-messages <n>`
* `--assignment-strategy <s>`
- * `--consumer.config <config-file>` - consumer config properties (typically
empty)
+ * `--consumer.config <config-file>` - (DEPRECATED) consumer config properties
(typically empty). This option will be removed in a future version. Use
--command-config instead.
+ * `--command-config <config-file>` - command config properties
Environment variables:
* `LOG_DIR` - log output directory. Typically not needed if logs are written
to stderr.
@@ -97,7 +98,8 @@ Command line arguments:
* `--broker-list <brokers>`
* `--max-messages <n>`
* `--throughput <msgs/s>`
- * `--producer.config <config-file>` - producer config properties (typically
empty)
+ * `--producer.config <config-file>` - producer config properties (typically
empty). This option will be removed in a future version. Use --command-config
instead.
+ * `--command-config <config-file>` - command config properties
Environment variables:
* `LOG_DIR` - log output directory. Typically not needed if logs are written
to stderr.
diff --git a/tests/kafkatest/services/verifiable_consumer.py
b/tests/kafkatest/services/verifiable_consumer.py
index 8264566f1c2..99c403ce190 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -424,7 +424,7 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
- cmd += " --consumer.config %s" % VerifiableConsumer.CONFIG_FILE
+ cmd += " --command-config %s" % VerifiableConsumer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableConsumer.STDOUT_CAPTURE,
VerifiableConsumer.STDOUT_CAPTURE)
return cmd
diff --git a/tests/kafkatest/services/verifiable_producer.py
b/tests/kafkatest/services/verifiable_producer.py
index e7a8f411f3a..049de4ce986 100644
--- a/tests/kafkatest/services/verifiable_producer.py
+++ b/tests/kafkatest/services/verifiable_producer.py
@@ -249,7 +249,7 @@ class VerifiableProducer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
if self.repeating_keys is not None:
cmd += " --repeating-keys %s " % str(self.repeating_keys)
- cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
+ cmd += " --command-config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE,
VerifiableProducer.STDOUT_CAPTURE)
return cmd
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 1fa1aed8c4f..1ecea5331dd 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -597,7 +597,7 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
.setDefault("earliest")
.type(String.class)
.dest("resetPolicy")
- .help("Set reset policy (must be either 'earliest', 'latest',
or 'none'");
+ .help("Set reset policy (must be either 'earliest', 'latest',
or 'none')");
parser.addArgument("--assignment-strategy")
.action(store())
@@ -611,8 +611,17 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
.action(store())
.required(false)
.type(String.class)
- .metavar("CONFIG_FILE")
- .help("Consumer config properties file (config options shared
with command line parameters will be overridden).");
+ .metavar("CONFIG-FILE")
+ .help("(DEPRECATED) Consumer config properties file" +
+ "This option will be removed in a future version. Use
--command-config instead");
+
+ parser.addArgument("--command-config")
+ .action(store())
+ .required(false)
+ .type(String.class)
+ .metavar("CONFIG-FILE")
+ .dest("commandConfigFile")
+ .help("Config properties file (config options shared with
command line parameters will be overridden).");
return parser;
}
@@ -622,16 +631,28 @@ public class VerifiableConsumer implements Closeable,
OffsetCommitCallback, Cons
boolean useAutoCommit = res.getBoolean("useAutoCommit");
String configFile = res.getString("consumer.config");
+ String commandConfigFile = res.getString("commandConfigFile");
String brokerHostAndPort = res.getString("bootstrapServer");
Properties consumerProps = new Properties();
+ if (configFile != null && commandConfigFile != null) {
+ throw new ArgumentParserException("Options --consumer.config and
--command-config are mutually exclusive.", parser);
+ }
if (configFile != null) {
+ System.out.println("Option --consumer.config has been deprecated
and will be removed in a future version. Use --command-config instead.");
try {
consumerProps.putAll(Utils.loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
}
+ if (commandConfigFile != null) {
+ try {
+
consumerProps.putAll(Utils.loadProps(res.getString(commandConfigFile)));
+ } catch (IOException e) {
+ throw new ArgumentParserException(e.getMessage(), parser);
+ }
+ }
GroupProtocol groupProtocol =
GroupProtocol.of(res.getString("groupProtocol"));
consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
groupProtocol.name());
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
index 33f0b3142e8..bfbbb8a4854 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
@@ -161,8 +161,9 @@ public class VerifiableProducer implements AutoCloseable {
.action(store())
.required(false)
.type(String.class)
- .metavar("CONFIG_FILE")
- .help("Producer config properties file.");
+ .metavar("CONFIG-FILE")
+ .help("(DEPRECATED) Producer config properties file. " +
+ "This option will be removed in a future version. Use
--command-config instead.");
parser.addArgument("--message-create-time")
.action(store())
@@ -189,6 +190,14 @@ public class VerifiableProducer implements AutoCloseable {
.dest("repeatingKeys")
.help("If specified, each produced record will have a key starting
at 0 increment by 1 up to the number specified (exclusive), then the key is set
to 0 again");
+ parser.addArgument("--command-config")
+ .action(store())
+ .required(false)
+ .type(String.class)
+ .metavar("CONFIG-FILE")
+ .dest("commandConfigFile")
+ .help("Config properties file (config options shared with command
line parameters will be overridden).");
+
return parser;
}
@@ -217,6 +226,7 @@ public class VerifiableProducer implements AutoCloseable {
String topic = res.getString("topic");
int throughput = res.getInt("throughput");
String configFile = res.getString("producer.config");
+ String commandConfigFile = res.getString("commandConfigFile");
Integer valuePrefix = res.getInt("valuePrefix");
Long createTime = res.getLong("createTime");
Integer repeatingKeys = res.getInt("repeatingKeys");
@@ -240,14 +250,25 @@ public class VerifiableProducer implements AutoCloseable {
producerProps.put(ProducerConfig.ACKS_CONFIG,
Integer.toString(res.getInt("acks")));
// No producer retries
producerProps.put(ProducerConfig.RETRIES_CONFIG, "0");
+ if (configFile != null && commandConfigFile != null) {
+ throw new ArgumentParserException("Options --producer.config and
--command-config are mutually exclusive.", parser);
+ }
+
if (configFile != null) {
+ System.out.println("Option --producer.config has been deprecated
and will be removed in a future version. Use --command-config instead.");
try {
producerProps.putAll(loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}
}
-
+ if (commandConfigFile != null) {
+ try {
+ producerProps.putAll(loadProps(commandConfigFile));
+ } catch (IOException e) {
+ throw new ArgumentParserException(e.getMessage(), parser);
+ }
+ }
StringSerializer serializer = new StringSerializer();
KafkaProducer<String, String> producer = new
KafkaProducer<>(producerProps, serializer, serializer);