dajac commented on PR #15215: URL: https://github.com/apache/kafka/pull/15215#issuecomment-1905503076
Hey @lianetm. Thanks for the update. > Regarding the use of assignedPartitions here (instead of the addedPartitions). I had initially used the addedPartitions, thinking that we could allow to continue fetching from the previously owned partitions, but then went with the assignedPartitions to keep how the legacy logic behaves (it effectively stops fetching from all partitions, given that the callback blocks the polling loop). Also the onPartitionsAssigned callback receives the full set of assignedPartitions as well, so we could expect the user potentially playing with positions for them as well. Hum... I am not sure about this. Looking at the legacy [code](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L424), it looks like that the callback is called with `addedPartitions`. You may have confused it with the `invokeOnAssignment` which provides the full assignment. I think that we should not touch the already assigned partitions. > Regarding the initializing offsets part, the way the async consumer executes callbacks and update positions it all happens in the Application thread sequentially (execute callback first [here](https://github.com/apache/kafka/blob/62ce551826192ef6137bc3ce670277f79bd3dee2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1632), then updates positions). Still, the intention there is not very clear, as it is processing multiple background events, so I added the changes in the core subscription state, with the clear intention of not initializing offsets if the partition is involved in the callback, consistent with the way we prevent fetching. Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
