This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch 4.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit e0483a6f5e329fb3ba1577f0d9e1fce5b6a5e47c Author: Lianet Magrans <[email protected]> AuthorDate: Wed Apr 1 11:19:12 2026 -0400 KAFKA-20282: Add message to classic consumer startup guiding to next-gen protocol (#21900) Add info msg if Consumer created using the classic protocol, to guide users towards the next-gen Consumer rebalance protocol (GA). Show only if using group rebalance capabilities (group.id defined). Exclude Streams applications to avoid showing the log on streams (should follow separate evolution to the new streams protocol). This is the first step in KIP-1274 (evolution path towards the new Consumer protocol) Reviewers: Andrew Schofield <[email protected]>, Lan Ding <[email protected]> --- .../consumer/internals/ClassicKafkaConsumer.java | 17 ++++++ .../kafka/clients/consumer/KafkaConsumerTest.java | 67 ++++++++++++++++++++++ .../kafka/tools/consumer/ConsoleConsumer.java | 13 +++++ 3 files changed, 97 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index 5b54a759a98..141ad11d485 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -72,6 +72,7 @@ import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.OptionalLong; @@ -211,6 +212,22 @@ public class ClassicKafkaConsumer<K, V> implements ConsumerDelegate<K, V> { config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG, clientId)) ); + // If the classic rebalance protocol is used, log message to guide users towards upgrading to the + // next-generation consumer rebalance protocol + if (groupId.isPresent()) { + boolean isStreamsConsumer = assignors.stream() + .anyMatch(a -> a.getClass().getName().contains("StreamsPartitionAssignor")); + if (!isStreamsConsumer) { + log.info("\n" + + "****************************************************************\n" + + "* The consumer rebalance protocol (KIP-848) is production ready!\n" + + "* Set the consumer configuration {}={} to try it out.\n" + + "* See https://kafka.apache.org/documentation/#consumer_rebalance_protocol\n" + + "****************************************************************", + ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT)); + } + } + // no coordinator will be constructed for the default (null) group id if (groupId.isEmpty()) { config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index f70a618bb65..fe0b4c98ff4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -113,6 +113,7 @@ import org.apache.kafka.test.TestUtils; import org.apache.logging.log4j.Level; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -568,6 +569,72 @@ public class KafkaConsumerTest { assertEquals("Class an.invalid.class cannot be found", e.getCause().getMessage()); } + @Test + public void testClassicProtocolLogsRecommendationToTryConsumerProtocol() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + + try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + assertTrue( + appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")), + "Log message about consumer protocol not showing as expected when starting a consumer using the classic protocol" + ); + } + } + + @Test + public void testConsumerProtocolDoesNotLogRecommendation() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + + try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + assertFalse( + appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")), + "Should not log recommendation when already using consumer protocol" + ); + } + } + + @Test + public void testDefaultProtocolLogsRecommendationToTryConsumerProtocol() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group"); + + try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + assertTrue( + appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")), + "Log message about consumer protocol not showing as expected when starting a consumer using the default (classic) protocol" + ); + } + } + + @Test + public void testNoGroupIdDoesNotLogGroupProtocolMessage() { + Properties props = new Properties(); + props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CLASSIC.name()); + props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999"); + + try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) { + appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO); + consumer = newConsumer(props, new StringDeserializer(), new StringDeserializer()); + assertFalse( + appender.getMessages().stream().anyMatch(m -> m.contains("The consumer rebalance protocol (KIP-848) is production ready!")), + "Should not log recommendation when no group.id is set" + ); + } + } + @ParameterizedTest @EnumSource(GroupProtocol.class) public void testOsDefaultSocketBufferSizes(GroupProtocol groupProtocol) { diff --git a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java index 74a928b9727..19251931aea 100644 --- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java @@ -17,7 +17,9 @@ package org.apache.kafka.tools.consumer; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.GroupProtocol; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.MessageFormatter; import org.apache.kafka.common.TopicPartition; @@ -70,6 +72,7 @@ public class ConsoleConsumer { public static void run(ConsoleConsumerOptions opts) { messageCount = 0; Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new ByteArrayDeserializer()); + maybePrintConsumerProtocolMessage(opts); ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts, consumer); addShutdownHook(consumerWrapper, opts); @@ -145,6 +148,16 @@ public class ConsoleConsumer { return gotError; } + static void maybePrintConsumerProtocolMessage(ConsoleConsumerOptions opts) { + String protocol = (String) opts.consumerProps().get(ConsumerConfig.GROUP_PROTOCOL_CONFIG); + if (protocol == null || GroupProtocol.CLASSIC.name().equalsIgnoreCase(protocol)) { + // Only print if INFO logging is not enabled (otherwise ClassicKafkaConsumer already logs it) + if (!LoggerFactory.getLogger("org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer").isInfoEnabled()) { + System.err.println("The consumer rebalance protocol (KIP-848) is production ready! Set group.protocol=consumer to try it out. See https://kafka.apache.org/documentation/#consumer_rebalance_protocol"); + } + } + } + public static class ConsumerWrapper { final Time time = Time.SYSTEM; final long timeoutMs;
