This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit e0483a6f5e329fb3ba1577f0d9e1fce5b6a5e47c
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 1 11:19:12 2026 -0400

    KAFKA-20282: Add message to classic consumer startup guiding to next-gen 
protocol (#21900)
    
    Add info msg if Consumer created using the classic protocol, to guide
    users towards the next-gen Consumer rebalance protocol (GA).  Show only
    if using group rebalance capabilities (group.id defined).
    
    Exclude Streams applications to avoid showing the log on streams (should
    follow  separate evolution to the new streams protocol).
    
    This is the first step in KIP-1274 (evolution path towards the new
    Consumer protocol)
    
    Reviewers: Andrew Schofield <[email protected]>, Lan Ding
     <[email protected]>
---
 .../consumer/internals/ClassicKafkaConsumer.java   | 17 ++++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  | 67 ++++++++++++++++++++++
 .../kafka/tools/consumer/ConsoleConsumer.java      | 13 +++++
 3 files changed, 97 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 5b54a759a98..141ad11d485 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -72,6 +72,7 @@ import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
@@ -211,6 +212,22 @@ public class ClassicKafkaConsumer<K, V> implements 
ConsumerDelegate<K, V> {
                     
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, 
clientId))
             );
 
+            // If the classic rebalance protocol is used, log message to guide 
users towards upgrading to the
+            // next-generation consumer rebalance protocol
+            if (groupId.isPresent()) {
+                boolean isStreamsConsumer = assignors.stream()
+                        .anyMatch(a -> 
a.getClass().getName().contains("StreamsPartitionAssignor"));
+                if (!isStreamsConsumer) {
+                    log.info("\n" +
+                            
"****************************************************************\n" +
+                            "* The consumer rebalance protocol (KIP-848) is 
production ready!\n" +
+                            "* Set the consumer configuration {}={} to try it 
out.\n" +
+                            "* See 
https://kafka.apache.org/documentation/#consumer_rebalance_protocol\n"; +
+                            
"****************************************************************",
+                            ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+                }
+            }
+
             // no coordinator will be constructed for the default (null) group 
id
             if (groupId.isEmpty()) {
                 config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index f70a618bb65..fe0b4c98ff4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -113,6 +113,7 @@ import org.apache.kafka.test.TestUtils;
 
 import org.apache.logging.log4j.Level;
 import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -568,6 +569,72 @@ public class KafkaConsumerTest {
         assertEquals("Class an.invalid.class cannot be found", 
e.getCause().getMessage());
     }
 
+    @Test
+    public void testClassicProtocolLogsRecommendationToTryConsumerProtocol() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+            assertTrue(
+                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+                    "Log message about consumer protocol not showing as 
expected when starting a consumer using the classic protocol"
+            );
+        }
+    }
+
+    @Test
+    public void testConsumerProtocolDoesNotLogRecommendation() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CONSUMER.name());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+            assertFalse(
+                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+                    "Should not log recommendation when already using consumer 
protocol"
+            );
+        }
+    }
+
+    @Test
+    public void testDefaultProtocolLogsRecommendationToTryConsumerProtocol() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+            assertTrue(
+                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+                    "Log message about consumer protocol not showing as 
expected when starting a consumer using the default (classic) protocol"
+            );
+        }
+    }
+
+    @Test
+    public void testNoGroupIdDoesNotLogGroupProtocolMessage() {
+        Properties props = new Properties();
+        props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.CLASSIC.name());
+        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9999");
+
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister()) {
+            appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+            consumer = newConsumer(props, new StringDeserializer(), new 
StringDeserializer());
+            assertFalse(
+                    appender.getMessages().stream().anyMatch(m -> 
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+                    "Should not log recommendation when no group.id is set"
+            );
+        }
+    }
+
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
     public void testOsDefaultSocketBufferSizes(GroupProtocol groupProtocol) {
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
index 74a928b9727..19251931aea 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -17,7 +17,9 @@
 package org.apache.kafka.tools.consumer;
 
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.MessageFormatter;
 import org.apache.kafka.common.TopicPartition;
@@ -70,6 +72,7 @@ public class ConsoleConsumer {
     public static void run(ConsoleConsumerOptions opts) {
         messageCount = 0;
         Consumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new 
ByteArrayDeserializer());
+        maybePrintConsumerProtocolMessage(opts);
         ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts, consumer);
 
         addShutdownHook(consumerWrapper, opts);
@@ -145,6 +148,16 @@ public class ConsoleConsumer {
         return gotError;
     }
 
+    static void maybePrintConsumerProtocolMessage(ConsoleConsumerOptions opts) 
{
+        String protocol = (String) 
opts.consumerProps().get(ConsumerConfig.GROUP_PROTOCOL_CONFIG);
+        if (protocol == null || 
GroupProtocol.CLASSIC.name().equalsIgnoreCase(protocol)) {
+            // Only print if INFO logging is not enabled (otherwise 
ClassicKafkaConsumer already logs it)
+            if 
(!LoggerFactory.getLogger("org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer").isInfoEnabled())
 {
+                System.err.println("The consumer rebalance protocol (KIP-848) 
is production ready! Set group.protocol=consumer to try it out. See 
https://kafka.apache.org/documentation/#consumer_rebalance_protocol";);
+            }
+        }
+    }
+
     public static class ConsumerWrapper {
         final Time time = Time.SYSTEM;
         final long timeoutMs;

Reply via email to