Lianet Magrans created KAFKA-19356:
--------------------------------------
Summary: AsyncConsumer should ensure consistency of assigned
partitions and subscription
Key: KAFKA-19356
URL: https://issues.apache.org/jira/browse/KAFKA-19356
Project: Kafka
Issue Type: Bug
Components: clients, consumer
Reporter: Lianet Magrans
Assignee: Lianet Magrans
Fix For: 4.1.0
With the new AsyncConsumer, the client reconciles partitions assigned by the
coordinator, and does not perform any validation against the subscription
(letting the coordinator drive, and expecting that subscription/assignments
should become consistent on the next HB req/resp round).
Still, this allows for a window of time where they would not be consistent on
the client, and the main risk is related to fetching I would expect: a
subscription change will clear the fetch buffer in the async consumer, but
won't prevent processing fetch responses for previous requests or issuing new
ones for an assigned partition that is not in the subscription anymore?
(fetched records are only discarded
[here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159]
if the partition is not assigned or not fetchable, so that would prevent this
risk from what I can see, could be missing something).
The Classic consumer ensures it does not maintain/fetch assigned partitions
that are not in the subscription because:
* classicConsumer does not take assignments received from the coordinator that
are not in the subscription ("ignores"/rejoins if it gets them)
([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]).
For this case, we could consider doing it in the AsyncConsumer similarly,
maybe not reconciling assignments received in HB response but not in the
subscription.
* classicConsumer pro-actively revokes assigned partitions not in the
subscription on every onJoinPrepare (blocking, updating the assignment before
fetching,
[here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]).
For this case we need to consider a bit more. We surely want to keep the
client away from driving assignment logic to keep it only on the coordinator,
but if we say we're waiting for the coordinator to revoke that partition (on
the next HB req/resp round), we still need to ensure on the client that we
don't consider such partition "fetchable", and that we discard fetch responses
for it.
Note that these validations could be applied on the client for explicit
subscription or client side regex only. In the case of broker-side regex the
client cannot perform any validation given that it never computes the regex
(and should not).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)