This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch 4.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit f8d5f730e2da23d7f1f39148471a8da4337cd268 Author: Lianet Magrans <[email protected]> AuthorDate: Wed Apr 1 16:06:49 2026 -0400 MINOR: Improve to skip output msg if manual assignment option used (#21927) Improve console consumer recommendation msg, to skip if using assign (partition option provided) This improvement can only be done in the console consumer because the partition option is passed (or not) upfront. Reviewers: Andrew Schofield <[email protected]> --- .../kafka/clients/consumer/internals/ClassicKafkaConsumer.java | 2 +- .../java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 6 +++--- .../main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java | 6 +++++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 141ad11d485..8ebdc48bcd2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -220,7 +220,7 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { if (!isStreamsConsumer) { log.info("\n" + "****************************************************************\n" + - "* The consumer rebalance protocol (KIP-848) is production ready!\n" + + "* The consumer rebalance protocol (KIP-848) is production-ready!\n" + "* Set the consumer configuration {}={} to try it out.\n" + "* See https://kafka.apache.org/documentation/#consumer_rebalance_protocol\n" + "****************************************************************", diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index fe0b4c98ff4..776b11c86c5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -580,7 +580,7 @@ public class KafkaConsumerTest { appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO); consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); assertTrue( - appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")), + appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")), "Log message about consumer protocol not showing as expected when starting a consumer using the classic protocol" ); } @@ -597,7 +597,7 @@ public class KafkaConsumerTest { appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO); consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); assertFalse( - appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")), + appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")), "Should not log recommendation when already using consumer protocol" ); } @@ -613,7 +613,7 @@ public class KafkaConsumerTest { appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO); consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); assertTrue( - appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")), + appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")), "Log message about consumer protocol not showing as expected when starting a consumer using the default (classic) protocol" ); } diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java index 19251931aea..a7ae037e987 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java @@ -149,11 +149,15 @@ public class ConsoleConsumer { } static void maybePrintConsumerProtocolMessage(ConsoleConsumerOptions opts) { + // Skip if using manual partition assignment used (--partition) + if (opts.partitionArg().isPresent()) { + return; + } String protocol = (String) opts.consumerProps().get(ConsumerConfig.GROUP_PROTOCOL_CONFIG); if (protocol == null || GroupProtocol.CLASSIC.name().equalsIgnoreCase(protocol)) { // Only print if INFO logging is not enabled (otherwise ClassicKafkaConsumer already logs it) if (!LoggerFactory.getLogger("org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer").isInfoEnabled()) { - System.err.println("The consumer rebalance protocol (KIP-848) is production ready! Set group.protocol=consumer to try it out. See https://kafka.apache.org/documentation/#consumer_rebalance_protocol"); + System.err.println("The consumer rebalance protocol (KIP-848) is production-ready! Set group.protocol=consumer to try it out. See https://kafka.apache.org/documentation/#consumer_rebalance_protocol"); } } }
