[
https://issues.apache.org/jira/browse/KAFKA-19356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lianet Magrans updated KAFKA-19356:
-----------------------------------
Labels: consumer-threading-refactor kip-848-client-support (was: )
> AsyncConsumer should ensure consistency of assigned partitions and
> subscription
> -------------------------------------------------------------------------------
>
> Key: KAFKA-19356
> URL: https://issues.apache.org/jira/browse/KAFKA-19356
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Lianet Magrans
> Assignee: Lianet Magrans
> Priority: Major
> Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.1.0
>
>
> With the new AsyncConsumer, the client reconciles partitions assigned by the
> coordinator, and does not perform any validation against the subscription
> (letting the coordinator drive, and expecting that subscription/assignments
> should become consistent on the next HB req/resp round).
> Still, this allows for a window of time where they would not be consistent on
> the client, and the main risk is related to fetching I would expect: a
> subscription change will clear the fetch buffer in the async consumer, but
> won't prevent processing fetch responses for previous requests or issuing new
> ones for an assigned partition that is not in the subscription anymore?
> (fetched records are only discarded
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159]
> if the partition is not assigned or not fetchable, so that would prevent
> this risk from what I can see, could be missing something).
> The Classic consumer ensures it does not maintain/fetch assigned partitions
> that are not in the subscription because:
> * classicConsumer does not take assignments received from the coordinator
> that are not in the subscription ("ignores"/rejoins if it gets them)
> ([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]).
> For this case, we could consider doing it in the AsyncConsumer similarly,
> maybe not reconciling assignments received in HB response but not in the
> subscription.
> * classicConsumer pro-actively revokes assigned partitions not in the
> subscription on every onJoinPrepare (blocking, updating the assignment before
> fetching,
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]).
> For this case we need to consider a bit more. We surely want to keep the
> client away from driving assignment logic to keep it only on the coordinator,
> but if we say we're waiting for the coordinator to revoke that partition (on
> the next HB req/resp round), we still need to ensure on the client that we
> don't consider such partition "fetchable", and that we discard fetch
> responses for it.
> Note that these validations could be applied on the client for explicit
> subscription or client side regex only. In the case of broker-side regex the
> client cannot perform any validation given that it never computes the regex
> (and should not).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)