lucasbru commented on code in PR #15742:
URL: https://github.com/apache/kafka/pull/15742#discussion_r1570342805
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -228,13 +228,16 @@ private void process(final ErrorEvent event) {
}
private void process(final
ConsumerRebalanceListenerCallbackNeededEvent event) {
- ApplicationEvent invokedEvent = invokeRebalanceCallbacks(
+ ConsumerRebalanceListenerCallbackCompletedEvent invokedEvent =
invokeRebalanceCallbacks(
rebalanceListenerInvoker,
event.methodName(),
event.partitions(),
event.future()
);
applicationEventHandler.add(invokedEvent);
+ if (invokedEvent.error().isPresent()) {
+ throw invokedEvent.error().get();
Review Comment:
Looking at the reconciliation logic, I think if `onPartitionsRevoked`
throws, we'll not execute `onPartitionsAssigned`. And the call to
`onPartitionsLost` seems to be independent of reconciliation. So not sure how
we'd end up with two exceptions.
You are right that there is a behavioral difference around finishing the
reconciliation. The old consumer throws _after_ finishing the reconciliation,
while the new consumer throws on a different thread, so there is no strict time
ordering between finishing the reconciliation and throwing. But I'm struggling
to see how one can observe the difference. The reconciliation will have
finished the next time the background thread processes any events, so in a
sense, you cannot observe the difference based on the queue architecture. The
difference may only be observable through shared state that breaks the
queue-based architecture. SubscriptionState comes to mind here. Thinking of
something like
1. application thread enters poll, fails during rebalance listener execution
and throws
2. application thread somehow reads subscription state
3. background thread updates subscription state as part of reconciliation
Now the application thread has observed an "incomplete reconciliation". But
after a listener execution has failed, we don't seem to update the subscription
state in the reconciliation.
So in summary - not sure if we are going to notice the different behaviors?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]