Kirk True created KAFKA-16312:
---------------------------------

             Summary: ConsumerRebalanceListener.onPartitionsAssigned should be 
called after joining, even if empty
                 Key: KAFKA-16312
                 URL: https://issues.apache.org/jira/browse/KAFKA-16312
             Project: Kafka
          Issue Type: Bug
          Components: clients, consumer
    Affects Versions: 3.7.0
            Reporter: Kirk True
            Assignee: Lianet Magrans
             Fix For: 3.8.0


There is a difference between the {{LegacyKafkaConsumer}} and 
{{AsyncKafkaConsumer}} respecting when the 
{{ConsumerRebalanceListener.onPartitionsAssigned()}} method is invoked.

 
For example, with {{onPartitionsAssigned()}}:

* {{LegacyKafkaConsumer}}: the listener method is invoked when the consumer 
joins the group, even if that consumer was not assigned any partitions. In this 
case it's passed an empty list.
* {{AsyncKafkaConsumer}}: the listener method is only invoked after the 
consumer joins the group iff it has assigned partitions

 
This difference is affecting the system tests. The system tests use a Java 
class named {{VerifiableConsumer}} which uses a {{ConsumerRebalanceListener}} 
that logs when the callbacks are invoked. The system tests then read from that 
log to determine when the callbacks are invoked. This coordination is used by 
the system tests to determine the lifecycle and status of the consumers.
 
The system tests rely heavily on the listener behavior of the 
{{LegacyKafkaConsumer}}. It invokes the {{onPartitionsAssigned()}} method when 
the consumer joins the group, and the system tests use that to determine when 
the consumer is actively a member of the group. This validation of membership 
is used as an assertion throughout the consumer-related tests.
 
In the system test I'm executing from {{consumer_test.py}}, there's a test that 
creates three consumers to read from a single topic with a single partition. 
It's a bit of an oddball test, but it demonstrates the issue.
 
Here are the logs pulled from the test run when executed using the 
{{LegacyKafkaConsumer}}:
 
Node 1:
 
{code:java}
[2024-02-15 00:43:52,400] INFO Adding newly assigned partitions:  
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
 
Node 2:
 
{code:java}
[2024-02-15 00:43:52,401] INFO Adding newly assigned partitions: test_topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}
 
Node 3:
 
{code:java}
[2024-02-15 00:43:52,399] INFO Adding newly assigned partitions:  
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Here are the logs when executing the same test using the {{AsyncKafkaConsumer}}:

Node 1:

{code:java}
[2024-02-15 01:15:46,576] INFO Adding newly assigned partitions: test_topic-0 
(org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker){code}

Node 2:

{code:java}n/a{code}

Node 3:

{code:java}n/a{code}

As a result of this change, the existing system tests do not work with the new 
consumer. However, even more importantly, this change in behavior may adversely 
affect existing users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to