lianetm commented on PR #15215: URL: https://github.com/apache/kafka/pull/15215#issuecomment-1904663735
Hey @dajac, I made the changes discussed above, for using an internal flag instead, only affecting the fetchable and initializing positions states. Couple of comments: 1. Regarding the use of assignedPartitions here (instead of the addedPartitions). I initially went with the addedPartitions also, thinking that we could allow to continue fetching from the previously owned partitions, but then went with the assignedPartitions to keep how the legacy logic behaves (it effectively stops fetching from all partitions, given that the callback blocks the polling loop). Also the onPartitionsAssigned callback receives the full set of assignedPartitions as well, so we could expect the user potentially playing with positions for them as well. 2. Regarding the initializing offsets part, the way the async consumer executes callbacks and update positions it all happens in the Application thread sequentially (execute callback first [here](https://github.com/apache/kafka/blob/62ce551826192ef6137bc3ce670277f79bd3dee2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1632), then updates positions). Still, the intention there is not very clear, as it is processing multiple background events, so I added the changes in the core subscription state, with the clear intention of not initializing offsets if the partition is involved in the callback, consistent with the way we prevent fetching. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
