Yunyung commented on code in PR #20479:
URL: https://github.com/apache/kafka/pull/20479#discussion_r2331457224
##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java:
##########
@@ -58,11 +58,21 @@ public ConsoleShareConsumerOptions(String[] args) throws
IOException {
.withRequiredArg()
.describedAs("topic")
.ofType(String.class);
- OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "A mechanism to pass user-defined
properties in the form key=value to the consumer.")
+ OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass
user-defined properties in the form key=value to the consumer. " +
Review Comment:
Ditto. Deprecated tag
##########
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java:
##########
@@ -271,4 +271,180 @@ public void testRejectAndReleaseOption() throws
IOException {
Exit.resetExitProcedure();
}
}
+
+ @Test
+ public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-property", "session.timeout.ms=10000",
+ "--command-property", "request.timeout.ms=30000"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleShareConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitOnBothConsumerConfigAndCommandConfig() throws
IOException {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put("request.timeout.ms", "1000");
+ File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+ Map<String, String> configs2 = new HashMap<>();
+ configs2.put("session.timeout.ms", "10000");
+ File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-config", propsFile.getAbsolutePath(),
+ "--command-config", propsFile2.getAbsolutePath()
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleShareConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldParseValidConsumerConfigWithSessionTimeout() throws
IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-property", "session.timeout.ms=10000"
+ };
+
+ ConsoleShareConsumerOptions config = new
ConsoleShareConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg());
+ assertEquals("10000",
consumerProperties.getProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG));
+ }
+
+ @Test
+ public void shouldParseConfigsFromFile() throws IOException {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("request.timeout.ms", "1000");
+ configs.put("group.id", "group1");
+ File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ ConsoleShareConsumerOptions config = new
ConsoleShareConsumerOptions(args);
+
+ // KafkaShareConsumer uses Utils.propsToMap to convert the properties
to a map,
+ // so using the same method to check the map has the expected values
+ Map<String, Object> configMap =
Utils.propsToMap(config.consumerProps());
+ assertEquals("1000", configMap.get("request.timeout.ms"));
+ assertEquals("group1", configMap.get("group.id"));
+ }
+
+ @Test
+ public void groupIdsProvidedInDifferentPlacesMustMatch() throws
IOException {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ // different in all three places
+ File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"group-from-file"));
+ final String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--command-property", "group.id=group-from-properties",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleShareConsumerOptions(args));
+
+ // the same in all three places
+ propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"test-group"));
+ final String[] args1 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "test-group",
+ "--command-property", "group.id=test-group",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ ConsoleShareConsumerOptions config = new
ConsoleShareConsumerOptions(args1);
+ Properties props = config.consumerProps();
+ assertEquals("test-group", props.getProperty("group.id"));
+
+ // different via --command-property and --command-config
+ propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"group-from-file"));
+ final String[] args2 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-property", "group.id=group-from-properties",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleShareConsumerOptions(args2));
+
+ // different via --command-property and --group
+ final String[] args3 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--command-property", "group.id=group-from-properties"
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleShareConsumerOptions(args3));
+
+ // different via --group and --command-config
+ propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"group-from-file"));
+ final String[] args4 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleShareConsumerOptions(args4));
+
+ // via --group only
+ final String[] args5 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments"
+ };
+
+ config = new ConsoleShareConsumerOptions(args5);
+ props = config.consumerProps();
+ assertEquals("group-from-arguments", props.getProperty("group.id"));
+
+ Exit.resetExitProcedure();
Review Comment:
This should be in a finally {} block.
##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##########
@@ -87,11 +87,21 @@ public ConsoleConsumerOptions(String[] args) throws
IOException {
.describedAs("consume offset")
.ofType(String.class)
.defaultsTo("latest");
- OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "A mechanism to pass user-defined
properties in the form key=value to the consumer.")
+ OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass
user-defined properties in the form key=value to the consumer." +
+ "This option will be removed in a future version. Use
--command-property instead.")
.withRequiredArg()
.describedAs("consumer_prop")
.ofType(String.class);
- OptionSpec<String> consumerConfigOpt =
parser.accepts("consumer.config", "Consumer config properties file. Note that "
+ consumerPropertyOpt + " takes precedence over this config.")
+ OptionSpec<String> commandPropertyOpt =
parser.accepts("command-property", "A mechanism to pass user-defined properties
in the form key=value to the consumer.")
+ .withRequiredArg()
+ .describedAs("consumer_prop")
+ .ofType(String.class);
+ OptionSpec<String> consumerConfigOpt =
parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties
file. Note that " + commandPropertyOpt + " takes precedence over this config." +
Review Comment:
Space.
##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -132,7 +132,9 @@ static final class ConsoleProducerOptions extends
CommandDefaultOptions {
private final OptionSpec<String> propertyOpt;
private final OptionSpec<String> readerConfigOpt;
private final OptionSpec<String> producerPropertyOpt;
Review Comment:
Please add deprecate tag like @Deprecated(since = "4.2", forRemoval = true)
##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##########
@@ -87,11 +87,21 @@ public ConsoleConsumerOptions(String[] args) throws
IOException {
.describedAs("consume offset")
.ofType(String.class)
.defaultsTo("latest");
- OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "A mechanism to pass user-defined
properties in the form key=value to the consumer.")
+ OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass
user-defined properties in the form key=value to the consumer." +
Review Comment:
Space.
##########
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java:
##########
@@ -618,4 +618,232 @@ private String[] generateArgsForFormatter(String
formatter) {
"--formatter", formatter,
};
}
+
+ @Test
+ public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer-property", "auto.offset.reset=latest",
+ "--command-property", "session.timeout.ms=10000"
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldExitOnBothConsumerConfigAndCommandConfig() throws
IOException {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ Map<String, String> configs = new HashMap<>();
+ configs.put("request.timeout.ms", "1000");
+ File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+ Map<String, String> configs2 = new HashMap<>();
+ configs2.put("session.timeout.ms", "10000");
+ File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--consumer.config", propsFile.getAbsolutePath(),
+ "--command-config", propsFile2.getAbsolutePath()
+ };
+
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void
shouldParseValidConsumerConfigWithAutoOffsetResetLatestUsingCommandProperty()
throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-property", "auto.offset.reset=latest"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg().orElse(""));
+ assertFalse(config.fromBeginning());
+ assertEquals("latest",
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ }
+
+ @Test
+ public void
shouldParseValidConsumerConfigWithAutoOffsetResetEarliestUsingCommandProperty()
throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-property", "auto.offset.reset=earliest"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg().orElse(""));
+ assertFalse(config.fromBeginning());
+ assertEquals("earliest",
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ }
+
+ @Test
+ public void
shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningUsingCommandProperty()
throws IOException {
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-property", "auto.offset.reset=earliest",
+ "--from-beginning"
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ Properties consumerProperties = config.consumerProps();
+
+ assertEquals("localhost:9092", config.bootstrapServer());
+ assertEquals("test", config.topicArg().orElse(""));
+ assertTrue(config.fromBeginning());
+ assertEquals("earliest",
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+ }
+
+ @Test
+ public void
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningUsingCommandProperty()
{
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-property", "auto.offset.reset=latest",
+ "--from-beginning"
+ };
+ try {
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleConsumerOptions(args));
+ } finally {
+ Exit.resetExitProcedure();
+ }
+ }
+
+ @Test
+ public void shouldParseConfigsFromFileUsingCommandConfig() throws
IOException {
+ Map<String, String> configs = new HashMap<>();
+ configs.put("request.timeout.ms", "1000");
+ configs.put("group.id", "group1");
+ File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+ String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+ assertEquals("1000", config.consumerProps().get("request.timeout.ms"));
+ assertEquals("group1", config.consumerProps().get("group.id"));
+ }
+
+ @Test
+ public void groupIdsProvidedInDifferentPlacesMustMatchUsingCommandConfig()
throws IOException {
+ Exit.setExitProcedure((code, message) -> {
+ throw new IllegalArgumentException(message);
+ });
+
+ // different in all three places
+ File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"group-from-file"));
+ final String[] args = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--command-property", "group.id=group-from-properties",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleConsumerOptions(args));
+
+ // the same in all three places
+ propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"test-group"));
+ final String[] args1 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "test-group",
+ "--command-property", "group.id=test-group",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
+ Properties props = config.consumerProps();
+ assertEquals("test-group", props.getProperty("group.id"));
+
+ // different via --command-property and --command-config
+ propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"group-from-file"));
+ final String[] args2 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--command-property", "group.id=group-from-properties",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleConsumerOptions(args2));
+
+ // different via --command-property and --group
+ final String[] args3 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--command-property", "group.id=group-from-properties"
+ };
+
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleConsumerOptions(args3));
+
+ // different via --group and --command-config
+ propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id",
"group-from-file"));
+ final String[] args4 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments",
+ "--command-config", propsFile.getAbsolutePath()
+ };
+ assertThrows(IllegalArgumentException.class, () -> new
ConsoleConsumerOptions(args4));
+
+ // via --group only
+ final String[] args5 = new String[]{
+ "--bootstrap-server", "localhost:9092",
+ "--topic", "test",
+ "--group", "group-from-arguments"
+ };
+
+ config = new ConsoleConsumerOptions(args5);
+ props = config.consumerProps();
+ assertEquals("group-from-arguments", props.getProperty("group.id"));
+
+ Exit.resetExitProcedure();
Review Comment:
Ditto.
##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -250,11 +252,20 @@ public ConsoleProducerOptions(String[] args) {
.withRequiredArg()
.describedAs("config file")
.ofType(String.class);
- producerPropertyOpt = parser.accepts("producer-property", "A
mechanism to pass user-defined properties in the form key=value to the
producer. ")
+ producerPropertyOpt = parser.accepts("producer-property",
"(DEPRECATED) A mechanism to pass user-defined properties in the form key=value
to the producer." +
Review Comment:
space
```suggestion
producerPropertyOpt = parser.accepts("producer-property",
"(DEPRECATED) A mechanism to pass user-defined properties in the form key=value
to the producer. " +
```
##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##########
@@ -87,11 +87,21 @@ public ConsoleConsumerOptions(String[] args) throws
IOException {
.describedAs("consume offset")
.ofType(String.class)
.defaultsTo("latest");
- OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "A mechanism to pass user-defined
properties in the form key=value to the consumer.")
+ OptionSpec<String> consumerPropertyOpt =
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass
user-defined properties in the form key=value to the consumer." +
Review Comment:
Ditto. Deprecated tag
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]