This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad2e3e7435d MINOR: Improve to skip output msg if manual assignment 
option used (#21927)
ad2e3e7435d is described below

commit ad2e3e7435d42ef94a90027f9636c0a9abfe7efa
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 1 16:06:49 2026 -0400

    MINOR: Improve to skip output msg if manual assignment option used (#21927)
    
    Improve console consumer recommendation msg, to skip if using assign
    (partition option provided)
    
    This improvement can only be done in the console consumer because the
    partition option is passed (or not) upfront.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../kafka/clients/consumer/internals/ClassicKafkaConsumer.java      | 2 +-
 .../java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java   | 6 +++---
 .../main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java  | 6 +++++-
 3 files changed, 9 insertions(+), 5 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 883f03baaf0..b013bc21f66 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -221,7 +221,7 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                 if (!isStreamsConsumer) {
                     log.info("\n" +
                             
"****************************************************************\n" +
-                            "* The consumer rebalance protocol (KIP-848) is 
production ready!\n" +
+                            "* The consumer rebalance protocol (KIP-848) is 
production-ready!\n" +
                             "* Set the consumer configuration {}={} to try it 
out.\n" +
                             "* See 
https://kafka.apache.org/documentation/#consumer_rebalance_protocol\n"; +
                             
"****************************************************************",
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bcc22f6da17..ae5dd65a826 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -632,7 +632,7 @@ public class KafkaConsumerTest {
             appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
             consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
             assertTrue(
-                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")),
                     "Log message about consumer protocol not showing as 
expected when starting a consumer using the classic protocol"
             );
         }
@@ -649,7 +649,7 @@ public class KafkaConsumerTest {
             appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
             consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
             assertFalse(
-                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")),
                     "Should not log recommendation when already using consumer 
protocol"
             );
         }
@@ -665,7 +665,7 @@ public class KafkaConsumerTest {
             appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
             consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
             assertTrue(
-                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")),
                     "Log message about consumer protocol not showing as 
expected when starting a consumer using the default (classic) protocol"
             );
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
index 19251931aea..a7ae037e987 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -149,11 +149,15 @@ public class ConsoleConsumer {
     }
 
     static void maybePrintConsumerProtocolMessage(ConsoleConsumerOptions opts) 
{
+        // Skip if using manual partition assignment used (--partition)
+        if (opts.partitionArg().isPresent()) {
+            return;
+        }
         String protocol = (String) 
opts.consumerProps().get(ConsumerConfig.GROUP_PROTOCOL_CONFIG);
         if (protocol == null || 
GroupProtocol.CLASSIC.name().equalsIgnoreCase(protocol)) {
             // Only print if INFO logging is not enabled (otherwise 
ClassicKafkaConsumer already logs it)
             if 
(!LoggerFactory.getLogger("org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer").isInfoEnabled())
 {
-                System.err.println("The consumer rebalance protocol (KIP-848) 
is production ready! Set group.protocol=consumer to try it out. See 
https://kafka.apache.org/documentation/#consumer_rebalance_protocol";);
+                System.err.println("The consumer rebalance protocol (KIP-848) 
is production-ready! Set group.protocol=consumer to try it out. See 
https://kafka.apache.org/documentation/#consumer_rebalance_protocol";);
             }
         }
     }

Reply via email to