frankvicky commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2057856462
##########
tools/src/test/java/org/apache/kafka/tools/ResetIntegrationTest.java:
##########
@@ -256,9 +255,8 @@ public void
testResetWhenLongSessionTimeoutConfiguredWithForceOption(final TestI
streams = new
KafkaStreams(setupTopologyWithoutIntermediateUserTopic(), streamsConfig);
streams.cleanUp();
- // Reset would fail since long session timeout has been configured
final boolean cleanResult = tryCleanGlobal(false, null, null, appID);
- assertFalse(cleanResult);
+ assertTrue(cleanResult);
Review Comment:
In https://github.com/apache/kafka/pull/19400/files#r2036678414
We have passed `leaveGroup=true` to the close method, which will make all
consumers leave the group.
To keep the original behavior, we should pass `leaveGroup=false` to the
close method.
I think we should maintain the original behavior for now; I will change it
to false in the next commit.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1805,12 +1816,12 @@ private void completeShutdown(final boolean cleanRun) {
log.error("Failed to unsubscribe due to the following error: ", e);
}
try {
- mainConsumer.close();
+ mainConsumer.close(closeOptions);
} catch (final Throwable e) {
log.error("Failed to close consumer due to the following error:",
e);
}
try {
- restoreConsumer.close();
+ restoreConsumer.close(closeOptions);
Review Comment:
If the consumer doesn't subscribe, it will not join a group.
In this case, it will get an unknown member ID in the response.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]