This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 58d208f7b95 KAFKA-20282: Add message to classic consumer startup
guiding to next-gen protocol (#21900)
58d208f7b95 is described below
commit 58d208f7b95b93eff238f42b67153792207eb726
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 1 11:19:12 2026 -0400
KAFKA-20282: Add message to classic consumer startup guiding to next-gen
protocol (#21900)
Add info msg if Consumer created using the classic protocol, to guide
users towards the next-gen Consumer rebalance protocol (GA). Show only
if using group rebalance capabilities (group.id defined).
Exclude Streams applications to avoid showing the log on streams (should
follow separate evolution to the new streams protocol).
This is the first step in KIP-1274 (evolution path towards the new
Consumer protocol)
Reviewers: Andrew Schofield <[email protected]>, Lan Ding
<[email protected]>
---
.../consumer/internals/ClassicKafkaConsumer.java | 17 ++++++
.../kafka/clients/consumer/KafkaConsumerTest.java | 67 ++++++++++++++++++++++
.../kafka/tools/consumer/ConsoleConsumer.java | 13 +++++
3 files changed, 97 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index b8e181743a2..883f03baaf0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -72,6 +72,7 @@ import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
@@ -212,6 +213,22 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
config.originals(Collections.singletonMap(ConsumerConfig.CLIENT_ID_CONFIG,
clientId))
);
+ // If the classic rebalance protocol is used, log message to guide
users towards upgrading to the
+ // next-generation consumer rebalance protocol
+ if (groupId.isPresent()) {
+ boolean isStreamsConsumer = assignors.stream()
+ .anyMatch(a ->
a.getClass().getName().contains("StreamsPartitionAssignor"));
+ if (!isStreamsConsumer) {
+ log.info("\n" +
+
"****************************************************************\n" +
+ "* The consumer rebalance protocol (KIP-848) is
production ready!\n" +
+ "* Set the consumer configuration {}={} to try it
out.\n" +
+ "* See
https://kafka.apache.org/documentation/#consumer_rebalance_protocol\n" +
+
"****************************************************************",
+ ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
+ }
+ }
+
// no coordinator will be constructed for the default (null) group
id
if (groupId.isEmpty()) {
config.ignore(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 498f1fcbf41..bcc22f6da17 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -113,6 +113,7 @@ import org.apache.kafka.test.TestUtils;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -620,6 +621,72 @@ public class KafkaConsumerTest {
assertEquals("Class an.invalid.class cannot be found",
e.getCause().getMessage());
}
+ @Test
+ public void testClassicProtocolLogsRecommendationToTryConsumerProtocol() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+ assertTrue(
+ appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+ "Log message about consumer protocol not showing as
expected when starting a consumer using the classic protocol"
+ );
+ }
+ }
+
+ @Test
+ public void testConsumerProtocolDoesNotLogRecommendation() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CONSUMER.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+ assertFalse(
+ appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+ "Should not log recommendation when already using consumer
protocol"
+ );
+ }
+ }
+
+ @Test
+ public void testDefaultProtocolLogsRecommendationToTryConsumerProtocol() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+ props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
+
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+ assertTrue(
+ appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+ "Log message about consumer protocol not showing as
expected when starting a consumer using the default (classic) protocol"
+ );
+ }
+ }
+
+ @Test
+ public void testNoGroupIdDoesNotLogGroupProtocolMessage() {
+ Properties props = new Properties();
+ props.setProperty(ConsumerConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.CLASSIC.name());
+ props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9999");
+
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister()) {
+ appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
+ consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
+ assertFalse(
+ appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+ "Should not log recommendation when no group.id is set"
+ );
+ }
+ }
+
@ParameterizedTest
@EnumSource(GroupProtocol.class)
public void testOsDefaultSocketBufferSizes(GroupProtocol groupProtocol) {
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
index 74a928b9727..19251931aea 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -17,7 +17,9 @@
package org.apache.kafka.tools.consumer;
import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.MessageFormatter;
import org.apache.kafka.common.TopicPartition;
@@ -70,6 +72,7 @@ public class ConsoleConsumer {
public static void run(ConsoleConsumerOptions opts) {
messageCount = 0;
Consumer<byte[], byte[]> consumer = new
KafkaConsumer<>(opts.consumerProps(), new ByteArrayDeserializer(), new
ByteArrayDeserializer());
+ maybePrintConsumerProtocolMessage(opts);
ConsumerWrapper consumerWrapper = new ConsumerWrapper(opts, consumer);
addShutdownHook(consumerWrapper, opts);
@@ -145,6 +148,16 @@ public class ConsoleConsumer {
return gotError;
}
+ static void maybePrintConsumerProtocolMessage(ConsoleConsumerOptions opts)
{
+ String protocol = (String)
opts.consumerProps().get(ConsumerConfig.GROUP_PROTOCOL_CONFIG);
+ if (protocol == null ||
GroupProtocol.CLASSIC.name().equalsIgnoreCase(protocol)) {
+ // Only print if INFO logging is not enabled (otherwise
ClassicKafkaConsumer already logs it)
+ if
(!LoggerFactory.getLogger("org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer").isInfoEnabled())
{
+ System.err.println("The consumer rebalance protocol (KIP-848)
is production ready! Set group.protocol=consumer to try it out. See
https://kafka.apache.org/documentation/#consumer_rebalance_protocol");
+ }
+ }
+ }
+
public static class ConsumerWrapper {
final Time time = Time.SYSTEM;
final long timeoutMs;