lucasbru commented on code in PR #16272:
URL: https://github.com/apache/kafka/pull/16272#discussion_r1638000934
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1278,12 +1278,10 @@ void prepareShutdown(final Timer timer, final
AtomicReference<Throwable> firstEx
autoCommitSync(timer);
applicationEventHandler.add(new CommitOnCloseEvent());
- completeQuietly(
- () -> {
- maybeRevokePartitions();
- applicationEventHandler.addAndGet(new
LeaveOnCloseEvent(calculateDeadlineMs(timer)));
- },
- "Failed to send leaveGroup heartbeat with a timeout(ms)=" +
timer.timeoutMs(), firstException);
+ completeQuietly(() -> maybeRevokePartitions(),
Review Comment:
So the point here seems to be, if `maybeRevokePartitions` fails, likely due
to rebalancelistener, still leave the group.
It looks like a good change to me. I'm just surprised to see that the legacy
consumer does not seem to do this? If `onPrepareLeave` in `AbstractCoordinator`
fails, we won't reach `maybeLeaveGroup`.
So is this a bug also in the legacy consumer?
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java:
##########
@@ -158,7 +158,12 @@ public class AsyncKafkaConsumerTest {
public void resetAll() {
backgroundEventQueue.clear();
if (consumer != null) {
- consumer.close(Duration.ZERO);
+ try {
+ consumer.close(Duration.ZERO);
+ } catch (Exception e) {
+ // best effort to clean up after each test, but may throw (ex.
if callbacks where
+ // throwing errors)
Review Comment:
I did this before but was asked to not do it, see
https://github.com/apache/kafka/pull/15613/files/9fb917e4b1e60f238183c92d1ad3bc2565a7e1ea#r1559907295.
That's why I added a "clean-up close" in the tests where close fails, with an
expected exception (search for "clean-up" in this file).
I'd also be fine with your (and my original approach) to have a best-effort
clean up and ignore exceptions here. But then, let's remove the "clean-up
close" code in the other tests. Any consistent approach is fine with me here.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MembershipManagerImpl.java:
##########
@@ -673,7 +677,15 @@ public CompletableFuture<Void> leaveGroup() {
CompletableFuture<Void> callbackResult =
invokeOnPartitionsRevokedOrLostToReleaseAssignment();
callbackResult.whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("Member {} callback to release assignment failed.
Member will proceed " +
+ "to send leave group heartbeat", memberId, error);
+ } else {
+ log.debug("Member {} completed callback to release assignment
and will send leave " +
+ "group heartbeat", memberId);
+ }
// Clear the subscription, no matter if the callback execution
failed or succeeded.
+ subscriptions.unsubscribe();
clearSubscription();
Review Comment:
Is the name `clearSubscription` misleading? It seems like it clears the
assignment, not the subscription.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1324,6 +1322,7 @@ void completeQuietly(final Utils.ThrowingRunnable
function,
} catch (TimeoutException e) {
log.debug("Timeout expired before the {} operation could
complete.", msg);
Review Comment:
Should we update `firstException` here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]