chia7712 commented on code in PR #15819:
URL: https://github.com/apache/kafka/pull/15819#discussion_r1584226418
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1469,6 +1469,10 @@ private Thread shutdownHelper(final boolean error, final
long timeoutMs, final b
// save the current thread so that if it is a stream thread
// we don't attempt to join it and cause a deadlock
return new Thread(() -> {
+ if (leaveGroup) {
+ processStreamThread(streamThread ->
streamThread.setLeaveGroupOnClose(leaveGroup));
Review Comment:
`leaveGroup` is always `true`, so `setLeaveGroupOnClose` could be renamed to
`leaveGroupOnClose`. Also, it does not need the argument.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1428,13 +1432,23 @@ private long advanceNowAndComputeLatency() {
*/
public void shutdown() {
log.info("Informed to shut down");
+
+
+
final State oldState = setState(State.PENDING_SHUTDOWN);
if (oldState == State.CREATED) {
// The thread may not have been started. Take responsibility for
shutting down
completeShutdown(true);
}
}
+ public void setLeaveGroupOnClose(final boolean leaveGroupOnClose) {
+ if (leaveGroupOnClose) {
+ log.info("Leaving consumer group on close");
+ }
+ ((LegacyKafkaConsumer<?, ?>) mainConsumer).overrideLeaveGroupOnClose();
Review Comment:
If the casting is a acceptable way, we can enhance
`LegacyKafkaConsumer#close` to take `CloseOption` right now.
`LegacyKafkaConsumer` is not a public class so the change is ok. Also, that
would be a compatible way with new feature in the future.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]