Kirk True created KAFKA-16312:
---------------------------------
Summary: ConsumerRebalanceListener.onPartitionsAssigned should be
called after joining, even if empty
Key: KAFKA-16312
URL: https://issues.apache.org/jira/browse/KAFKA-16312
Project: Kafka
Issue Type: Bug
Components: clients, consumer
Affects Versions: 3.7.0
Reporter: Kirk True
Assignee: Lianet Magrans
Fix For: 3.8.0
There is a difference between the {{LegacyKafkaConsumer}} and
{{AsyncKafkaConsumer}} respecting when the
{{ConsumerRebalanceListener.onPartitionsAssigned()}} method is invoked.
For example, with {{onPartitionsAssigned()}}:
* {{LegacyKafkaConsumer}}: the listener method is invoked when the consumer
joins the group, even if that consumer was not assigned any partitions. In this
case it's passed an empty list.
* {{AsyncKafkaConsumer}}: the listener method is only invoked after the
consumer joins the group iff it has assigned partitions
This difference is affecting the system tests. The system tests use a Java
class named {{VerifiableConsumer}} which uses a {{ConsumerRebalanceListener}}
that logs when the callbacks are invoked. The system tests then read from that
log to determine when the callbacks are invoked. This coordination is used by
the system tests to determine the lifecycle and status of the consumers.
The system tests rely heavily on the listener behavior of the
{{LegacyKafkaConsumer}}. It invokes the {{onPartitionsAssigned()}} method when
the consumer joins the group, and the system tests use that to determine when
the consumer is actively a member of the group. This validation of membership
is used as an assertion throughout the consumer-related tests.
In the system test I'm executing from {{consumer_test.py}}, there's a test that
creates three consumers to read from a single topic with a single partition.
It's a bit of an oddball test, but it demonstrates the issue.
Here are the logs pulled from the test run when executed using the
{{LegacyKafkaConsumer}}:
Node 1:
{code:java}
[2024-02-15 00:43:52,400] INFO Adding newly assigned partitions:
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
Node 2:
{code:java}
[2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
Node 3:
{code:java}
[2024-02-15 00:43:52,399] INFO Adding newly assigned partitions:
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
Here are the logs when executing the same test using the {{AsyncKafkaConsumer}}:
Node 1:
{code:java}
[2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
Node 2:
{code:java}n/a{code}
Node 3:
{code:java}n/a{code}
As a result of this change, the existing system tests do not work with the new
consumer. However, even more importantly, this change in behavior may adversely
affect existing users.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)