Yunyung commented on code in PR #20479:
URL: https://github.com/apache/kafka/pull/20479#discussion_r2331457224


##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptions.java:
##########
@@ -58,11 +58,21 @@ public ConsoleShareConsumerOptions(String[] args) throws 
IOException {
                 .withRequiredArg()
                 .describedAs("topic")
                 .ofType(String.class);
-        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass 
user-defined properties in the form key=value to the consumer. " +

Review Comment:
   Ditto. Deprecated tag



##########
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleShareConsumerOptionsTest.java:
##########
@@ -271,4 +271,180 @@ public void testRejectAndReleaseOption() throws 
IOException {
             Exit.resetExitProcedure();
         }
     }
+
+    @Test
+    public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer-property", "session.timeout.ms=10000",
+            "--command-property", "request.timeout.ms=30000"
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldExitOnBothConsumerConfigAndCommandConfig() throws 
IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+        Map<String, String> configs2 = new HashMap<>();
+        configs2.put("session.timeout.ms", "10000");
+        File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer-config", propsFile.getAbsolutePath(),
+            "--command-config", propsFile2.getAbsolutePath()
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldParseValidConsumerConfigWithSessionTimeout() throws 
IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "session.timeout.ms=10000"
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg());
+        assertEquals("10000", 
consumerProperties.getProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG));
+    }
+
+    @Test
+    public void shouldParseConfigsFromFile() throws IOException {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        configs.put("group.id", "group1");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args);
+
+        // KafkaShareConsumer uses Utils.propsToMap to convert the properties 
to a map,
+        // so using the same method to check the map has the expected values
+        Map<String, Object> configMap = 
Utils.propsToMap(config.consumerProps());
+        assertEquals("1000", configMap.get("request.timeout.ms"));
+        assertEquals("group1", configMap.get("group.id"));
+    }
+
+    @Test
+    public void groupIdsProvidedInDifferentPlacesMustMatch() throws 
IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        // different in all three places
+        File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+        final String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments",
+            "--command-property", "group.id=group-from-properties",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args));
+
+        // the same in all three places
+        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
+        final String[] args1 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "test-group",
+            "--command-property", "group.id=test-group",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleShareConsumerOptions config = new 
ConsoleShareConsumerOptions(args1);
+        Properties props = config.consumerProps();
+        assertEquals("test-group", props.getProperty("group.id"));
+
+        // different via --command-property and --command-config
+        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+        final String[] args2 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "group.id=group-from-properties",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args2));
+
+        // different via --command-property and --group
+        final String[] args3 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments",
+            "--command-property", "group.id=group-from-properties"
+        };
+
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args3));
+
+        // different via --group and --command-config
+        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+        final String[] args4 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleShareConsumerOptions(args4));
+
+        // via --group only
+        final String[] args5 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments"
+        };
+
+        config = new ConsoleShareConsumerOptions(args5);
+        props = config.consumerProps();
+        assertEquals("group-from-arguments", props.getProperty("group.id"));
+
+        Exit.resetExitProcedure();

Review Comment:
   This should be in a finally {} block.



##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##########
@@ -87,11 +87,21 @@ public ConsoleConsumerOptions(String[] args) throws 
IOException {
                 .describedAs("consume offset")
                 .ofType(String.class)
                 .defaultsTo("latest");
-        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass 
user-defined properties in the form key=value to the consumer." +
+                        "This option will be removed in a future version. Use 
--command-property instead.")
                 .withRequiredArg()
                 .describedAs("consumer_prop")
                 .ofType(String.class);
-        OptionSpec<String> consumerConfigOpt = 
parser.accepts("consumer.config", "Consumer config properties file. Note that " 
+ consumerPropertyOpt + " takes precedence over this config.")
+        OptionSpec<String> commandPropertyOpt = 
parser.accepts("command-property", "A mechanism to pass user-defined properties 
in the form key=value to the consumer.")
+                .withRequiredArg()
+                .describedAs("consumer_prop")
+                .ofType(String.class);
+        OptionSpec<String> consumerConfigOpt = 
parser.accepts("consumer.config", "(DEPRECATED) Consumer config properties 
file. Note that " + commandPropertyOpt + " takes precedence over this config." +

Review Comment:
   Space.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -132,7 +132,9 @@ static final class ConsoleProducerOptions extends 
CommandDefaultOptions {
         private final OptionSpec<String> propertyOpt;
         private final OptionSpec<String> readerConfigOpt;
         private final OptionSpec<String> producerPropertyOpt;

Review Comment:
   Please add deprecate tag like @Deprecated(since = "4.2", forRemoval = true)



##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##########
@@ -87,11 +87,21 @@ public ConsoleConsumerOptions(String[] args) throws 
IOException {
                 .describedAs("consume offset")
                 .ofType(String.class)
                 .defaultsTo("latest");
-        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass 
user-defined properties in the form key=value to the consumer." +

Review Comment:
   Space.



##########
tools/src/test/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptionsTest.java:
##########
@@ -618,4 +618,232 @@ private String[] generateArgsForFormatter(String 
formatter) {
             "--formatter", formatter,
         };
     }
+
+    @Test
+    public void shouldExitOnBothConsumerPropertyAndCommandProperty() {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer-property", "auto.offset.reset=latest",
+            "--command-property", "session.timeout.ms=10000"
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldExitOnBothConsumerConfigAndCommandConfig() throws 
IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+
+        Map<String, String> configs2 = new HashMap<>();
+        configs2.put("session.timeout.ms", "10000");
+        File propsFile2 = ToolsTestUtils.tempPropertiesFile(configs2);
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--consumer.config", propsFile.getAbsolutePath(),
+            "--command-config", propsFile2.getAbsolutePath()
+        };
+
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetLatestUsingCommandProperty() 
throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=latest"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg().orElse(""));
+        assertFalse(config.fromBeginning());
+        assertEquals("latest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+    }
+
+    @Test
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetEarliestUsingCommandProperty() 
throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=earliest"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg().orElse(""));
+        assertFalse(config.fromBeginning());
+        assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+    }
+
+    @Test
+    public void 
shouldParseValidConsumerConfigWithAutoOffsetResetAndMatchingFromBeginningUsingCommandProperty()
 throws IOException {
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=earliest",
+            "--from-beginning"
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        Properties consumerProperties = config.consumerProps();
+
+        assertEquals("localhost:9092", config.bootstrapServer());
+        assertEquals("test", config.topicArg().orElse(""));
+        assertTrue(config.fromBeginning());
+        assertEquals("earliest", 
consumerProperties.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG));
+    }
+
+    @Test
+    public void 
shouldExitOnInvalidConfigWithAutoOffsetResetAndConflictingFromBeginningUsingCommandProperty()
 {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "auto.offset.reset=latest",
+            "--from-beginning"
+        };
+        try {
+            assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+        } finally {
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @Test
+    public void shouldParseConfigsFromFileUsingCommandConfig() throws 
IOException {
+        Map<String, String> configs = new HashMap<>();
+        configs.put("request.timeout.ms", "1000");
+        configs.put("group.id", "group1");
+        File propsFile = ToolsTestUtils.tempPropertiesFile(configs);
+        String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args);
+        assertEquals("1000", config.consumerProps().get("request.timeout.ms"));
+        assertEquals("group1", config.consumerProps().get("group.id"));
+    }
+
+    @Test
+    public void groupIdsProvidedInDifferentPlacesMustMatchUsingCommandConfig() 
throws IOException {
+        Exit.setExitProcedure((code, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+
+        // different in all three places
+        File propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+        final String[] args = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments",
+            "--command-property", "group.id=group-from-properties",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args));
+
+        // the same in all three places
+        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"test-group"));
+        final String[] args1 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "test-group",
+            "--command-property", "group.id=test-group",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        ConsoleConsumerOptions config = new ConsoleConsumerOptions(args1);
+        Properties props = config.consumerProps();
+        assertEquals("test-group", props.getProperty("group.id"));
+
+        // different via --command-property and --command-config
+        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+        final String[] args2 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--command-property", "group.id=group-from-properties",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args2));
+
+        // different via --command-property and --group
+        final String[] args3 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments",
+            "--command-property", "group.id=group-from-properties"
+        };
+
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args3));
+
+        // different via --group and --command-config
+        propsFile = ToolsTestUtils.tempPropertiesFile(Map.of("group.id", 
"group-from-file"));
+        final String[] args4 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments",
+            "--command-config", propsFile.getAbsolutePath()
+        };
+        assertThrows(IllegalArgumentException.class, () -> new 
ConsoleConsumerOptions(args4));
+
+        // via --group only
+        final String[] args5 = new String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--topic", "test",
+            "--group", "group-from-arguments"
+        };
+
+        config = new ConsoleConsumerOptions(args5);
+        props = config.consumerProps();
+        assertEquals("group-from-arguments", props.getProperty("group.id"));
+
+        Exit.resetExitProcedure();

Review Comment:
   Ditto.



##########
tools/src/main/java/org/apache/kafka/tools/ConsoleProducer.java:
##########
@@ -250,11 +252,20 @@ public ConsoleProducerOptions(String[] args) {
                     .withRequiredArg()
                     .describedAs("config file")
                     .ofType(String.class);
-            producerPropertyOpt = parser.accepts("producer-property", "A 
mechanism to pass user-defined properties in the form key=value to the 
producer. ")
+            producerPropertyOpt = parser.accepts("producer-property", 
"(DEPRECATED) A mechanism to pass user-defined properties in the form key=value 
to the producer." +

Review Comment:
   space
   ```suggestion
               producerPropertyOpt = parser.accepts("producer-property", 
"(DEPRECATED) A mechanism to pass user-defined properties in the form key=value 
to the producer. " +
   ```



##########
tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumerOptions.java:
##########
@@ -87,11 +87,21 @@ public ConsoleConsumerOptions(String[] args) throws 
IOException {
                 .describedAs("consume offset")
                 .ofType(String.class)
                 .defaultsTo("latest");
-        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "A mechanism to pass user-defined 
properties in the form key=value to the consumer.")
+        OptionSpec<String> consumerPropertyOpt = 
parser.accepts("consumer-property", "(DEPRECATED) A mechanism to pass 
user-defined properties in the form key=value to the consumer." +

Review Comment:
   Ditto. Deprecated tag



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to