[
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lianet Magrans updated KAFKA-16665:
-----------------------------------
Description:
If we attempt to call consumer.position(tp) from within the
onPartitionsAssigned callback, the new consumer fails with a TimeoutException.
The expectation is that we should be able to retrieve the position of newly
assigned partitions, as it happens with the legacy consumer, that allows this
call. This is actually used from places within Kafka itself (ex. Connect
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
The failure in the new consumer is because the partitions that are assigned but
awaiting the onPartitionsAssigned callback, are excluded from the list of
partitions that should initialize. We should allow the partitions to initialize
their positions, without allowing to fetch data from them (which is already
achieve based on the isFetchable flag in the subscription state).
Note that a partition position can be updated from 2 places: call to
consumer.position or call to consumer.poll. Both will attempt to
`updateFetchPositions` if there is no valid position yet, but even after having
a valid position after those calls, the partition will remain non-fetchable
until the onPartitionsAssigned callback completes (fetchable considers that the
partitions has a valid position AND is not awaiting the callback)
was:
If we attempt to call consumer.position(tp) from within the
onPartitionsAssigned callback, the new consumer fails with a TimeoutException.
The expectation is that we should be able to retrieve the position of newly
assigned partitions, as it happens with the legacy consumer, that allows this
call. This is actually used from places within Kafka itself (ex. Connect
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
The failure in the new consumer is because the partitions that are assigned but
awaiting the onPartitionsAssigned callback, are excluded from the list of
partitions that should initialize. We should allow the partitions to initialize
their positions, without allowing to fetch data from them (which is already
achieve based on the isFetchable flag in the subscription state).
> Fail to get partition's position from within onPartitionsAssigned callback in
> new consumer
> -------------------------------------------------------------------------------------------
>
> Key: KAFKA-16665
> URL: https://issues.apache.org/jira/browse/KAFKA-16665
> Project: Kafka
> Issue Type: Task
> Components: clients, consumer
> Affects Versions: 3.7.0
> Reporter: Lianet Magrans
> Assignee: Lianet Magrans
> Priority: Major
> Labels: kip-848-client-support
> Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the
> onPartitionsAssigned callback, the new consumer fails with a
> TimeoutException. The expectation is that we should be able to retrieve the
> position of newly assigned partitions, as it happens with the legacy
> consumer, that allows this call. This is actually used from places within
> Kafka itself (ex. Connect
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>
> The failure in the new consumer is because the partitions that are assigned
> but awaiting the onPartitionsAssigned callback, are excluded from the list of
> partitions that should initialize. We should allow the partitions to
> initialize their positions, without allowing to fetch data from them (which
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to
> consumer.position or call to consumer.poll. Both will attempt to
> `updateFetchPositions` if there is no valid position yet, but even after
> having a valid position after those calls, the partition will remain
> non-fetchable until the onPartitionsAssigned callback completes (fetchable
> considers that the partitions has a valid position AND is not awaiting the
> callback)
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)