ableegoldman commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2076630475
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1640,33 +1599,6 @@ public synchronized boolean close(final CloseOptions
options) throws IllegalArgu
return close(Optional.of(timeoutMs), options.leaveGroup);
}
- private Consumer<StreamThread> streamThreadLeaveConsumerGroup(final long
remainingTimeMs) {
Review Comment:
yeah, so we should basically just leave everything up to the StreamThread's
`completeShutdown` method. It's true that right now the shutdown helper calls
this `#streamThreadLeaveConsumerGroup` after the thread has shut down which
makes it seem like we want the shutdown helper to be responsible for managing
the group membership after the thread has died, but that's really just an
artifact of the lack of a `consumer#close(CloseOptions)` before now. We had to
wait for the thread to shut down and close the consumer, and then we could use
the admin client to explicitly remove it from the group. But that's really not
ideal for a number of reasons (a big one being that it only works for static
members) and was essentially just a workaround since the consumer couldn't do
this itself.
Now that we do have consumers which can handle their own group membership
via `#close`, we don't need this workaround and can leave everything up to the
consumer which the StreamThread owns. In short, instead of refactoring the way
this `#streamThreadLeaveConsumerGroup` method works, we should just remove this
call entirely, and leave it up to the StreamThread to properly close the
consumer with the right leavegroup option in its `#completeShutdown`
Lmk if that makes sense!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]