[ 
https://issues.apache.org/jira/browse/KAFKA-16665?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lianet Magrans updated KAFKA-16665:
-----------------------------------
    Description: 
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

Note that a partition position can be updated from 2 places: call to 
consumer.position or call to consumer.poll. Both will attempt to 
`updateFetchPositions` if there is no valid position yet, but even after having 
a valid position after those calls, the partition will remain non-fetchable 
until the onPartitionsAssigned callback completes (fetchable considers that the 
partitions has a valid position AND is not awaiting the callback)

  

  was:
If we attempt to call consumer.position(tp) from within the 
onPartitionsAssigned callback, the new consumer fails with a TimeoutException. 
The expectation is that we should be able to retrieve the position of newly 
assigned partitions, as it happens with the legacy consumer, that allows this 
call. This is actually used from places within Kafka itself (ex. Connect 
[WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
 

The failure in the new consumer is because the partitions that are assigned but 
awaiting the onPartitionsAssigned callback, are excluded from the list of 
partitions that should initialize. We should allow the partitions to initialize 
their positions, without allowing to fetch data from them (which is already 
achieve based on the isFetchable flag in the subscription state).

  


> Fail to get partition's position from within onPartitionsAssigned callback in 
> new consumer 
> -------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16665
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16665
>             Project: Kafka
>          Issue Type: Task
>          Components: clients, consumer
>    Affects Versions: 3.7.0
>            Reporter: Lianet Magrans
>            Assignee: Lianet Magrans
>            Priority: Major
>              Labels: kip-848-client-support
>             Fix For: 3.8.0
>
>
> If we attempt to call consumer.position(tp) from within the 
> onPartitionsAssigned callback, the new consumer fails with a 
> TimeoutException. The expectation is that we should be able to retrieve the 
> position of newly assigned partitions, as it happens with the legacy 
> consumer, that allows this call. This is actually used from places within 
> Kafka itself (ex. Connect 
> [WorkerSinkTask|https://github.com/apache/kafka/blob/2c0b8b692071b6d436fa7e9dc90a1592476a6b17/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java#L715])
>  
> The failure in the new consumer is because the partitions that are assigned 
> but awaiting the onPartitionsAssigned callback, are excluded from the list of 
> partitions that should initialize. We should allow the partitions to 
> initialize their positions, without allowing to fetch data from them (which 
> is already achieve based on the isFetchable flag in the subscription state).
> Note that a partition position can be updated from 2 places: call to 
> consumer.position or call to consumer.poll. Both will attempt to 
> `updateFetchPositions` if there is no valid position yet, but even after 
> having a valid position after those calls, the partition will remain 
> non-fetchable until the onPartitionsAssigned callback completes (fetchable 
> considers that the partitions has a valid position AND is not awaiting the 
> callback)
>   



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to