This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ad2e3e7435d MINOR: Improve to skip output msg if manual assignment
option used (#21927)
ad2e3e7435d is described below
commit ad2e3e7435d42ef94a90027f9636c0a9abfe7efa
Author: Lianet Magrans <[email protected]>
AuthorDate: Wed Apr 1 16:06:49 2026 -0400
MINOR: Improve to skip output msg if manual assignment option used (#21927)
Improve console consumer recommendation msg, to skip if using assign
(partition option provided)
This improvement can only be done in the console consumer because the
partition option is passed (or not) upfront.
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/internals/ClassicKafkaConsumer.java | 2 +-
.../java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java | 6 +++---
.../main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java | 6 +++++-
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
index 883f03baaf0..b013bc21f66 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java
@@ -221,7 +221,7 @@ public class ClassicKafkaConsumer<K, V> implements
ConsumerDelegate<K, V> {
if (!isStreamsConsumer) {
log.info("\n" +
"****************************************************************\n" +
- "* The consumer rebalance protocol (KIP-848) is
production ready!\n" +
+ "* The consumer rebalance protocol (KIP-848) is
production-ready!\n" +
"* Set the consumer configuration {}={} to try it
out.\n" +
"* See
https://kafka.apache.org/documentation/#consumer_rebalance_protocol\n" +
"****************************************************************",
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index bcc22f6da17..ae5dd65a826 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -632,7 +632,7 @@ public class KafkaConsumerTest {
appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
assertTrue(
- appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+ appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")),
"Log message about consumer protocol not showing as
expected when starting a consumer using the classic protocol"
);
}
@@ -649,7 +649,7 @@ public class KafkaConsumerTest {
appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
assertFalse(
- appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+ appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")),
"Should not log recommendation when already using consumer
protocol"
);
}
@@ -665,7 +665,7 @@ public class KafkaConsumerTest {
appender.setClassLogger(ClassicKafkaConsumer.class, Level.INFO);
consumer = newConsumer(props, new StringDeserializer(), new
StringDeserializer());
assertTrue(
- appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production ready!")),
+ appender.getMessages().stream().anyMatch(m ->
m.contains("The consumer rebalance protocol (KIP-848) is production-ready!")),
"Log message about consumer protocol not showing as
expected when starting a consumer using the default (classic) protocol"
);
}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
index 19251931aea..a7ae037e987 100644
--- a/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/consumer/ConsoleConsumer.java
@@ -149,11 +149,15 @@ public class ConsoleConsumer {
}
static void maybePrintConsumerProtocolMessage(ConsoleConsumerOptions opts)
{
+ // Skip if using manual partition assignment used (--partition)
+ if (opts.partitionArg().isPresent()) {
+ return;
+ }
String protocol = (String)
opts.consumerProps().get(ConsumerConfig.GROUP_PROTOCOL_CONFIG);
if (protocol == null ||
GroupProtocol.CLASSIC.name().equalsIgnoreCase(protocol)) {
// Only print if INFO logging is not enabled (otherwise
ClassicKafkaConsumer already logs it)
if
(!LoggerFactory.getLogger("org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer").isInfoEnabled())
{
- System.err.println("The consumer rebalance protocol (KIP-848)
is production ready! Set group.protocol=consumer to try it out. See
https://kafka.apache.org/documentation/#consumer_rebalance_protocol");
+ System.err.println("The consumer rebalance protocol (KIP-848)
is production-ready! Set group.protocol=consumer to try it out. See
https://kafka.apache.org/documentation/#consumer_rebalance_protocol");
}
}
}