[
https://issues.apache.org/jira/browse/KAFKA-19356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17955675#comment-17955675
]
Lianet Magrans commented on KAFKA-19356:
----------------------------------------
Hey [~kirktrue], the important distinction here is assigned partitions vs
subscription.
{quote}The link to {{FetchCollector.fetchRecords()}} you included is designed
to catch the case where a topic/partition is unassigned
{quote}
Exactly, but won't catch anything the case where the assigned partition is not
in the subscription anymore (so I expect the description is accurate, correct?
"won't prevent processing fetch responses for previous requests or issuing new
ones for an *assigned partition* that is *not in the subscription* anymore").
Just to make sure we're on the same page about the having a gap. I expect we
need to make things consistent on the assignment path on the client for new
assignments, but still to define the best way to handle it when assignment does
not change and it's subscription that changes (challenge of keeping the clients
simple).
> AsyncConsumer should ensure consistency of assigned partitions and
> subscription
> -------------------------------------------------------------------------------
>
> Key: KAFKA-19356
> URL: https://issues.apache.org/jira/browse/KAFKA-19356
> Project: Kafka
> Issue Type: Bug
> Components: clients, consumer
> Reporter: Lianet Magrans
> Assignee: Lianet Magrans
> Priority: Major
> Labels: consumer-threading-refactor, kip-848-client-support
> Fix For: 4.1.0
>
>
> With the new AsyncConsumer, the client reconciles partitions assigned by the
> coordinator, and does not perform any validation against the subscription
> (letting the coordinator drive, and expecting that subscription/assignments
> should become consistent on the next HB req/resp round).
> Still, this allows for a window of time where they would not be consistent on
> the client, and the main risk is related to fetching I would expect: a
> subscription change will clear the fetch buffer in the async consumer, but
> won't prevent processing fetch responses for previous requests or issuing new
> ones for an assigned partition that is not in the subscription anymore?
> (fetched records are only discarded
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java#L153-L159]
> if the partition is not assigned or not fetchable, so that would prevent
> this risk from what I can see, could be missing something).
> The Classic consumer ensures it does not maintain/fetch assigned partitions
> that are not in the subscription in 2 ways:
> * classicConsumer does not take assignments received from the coordinator
> that are not in the subscription ("ignores"/rejoins if it gets them)
> ([here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L410C28-L410C62]).
> For this case, we could consider doing it in the AsyncConsumer similarly,
> maybe not reconciling assignments received in HB response but not in the
> subscription.
> * classicConsumer pro-actively revokes assigned partitions not in the
> subscription on every onJoinPrepare (blocking, updating the assignment before
> fetching,
> [here|https://github.com/apache/kafka/blob/cc0f06554bacc9fe086c4f4c5ecee1b5dc75ae9b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L838-L849]).
> For this case we need to consider a bit more. We surely want to keep the
> client away from driving assignment logic to keep it only on the coordinator,
> but if we say we're waiting for the coordinator to revoke that partition (on
> the next HB req/resp round), we still need to ensure on the client that we
> don't consider such partition "fetchable", and that we discard fetch
> responses for it.
> Note that these validations could be applied on the client for explicit
> subscription or client side regex only. In the case of broker-side regex the
> client cannot perform any validation given that it never computes the regex
> (and should not).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)