frankvicky commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2134956959
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1205,45 +1200,9 @@ private Optional<String> removeStreamThread(final long
timeoutMs) throws Timeout
final long cacheSizePerThread =
cacheSizePerThread(numLiveStreamThreads());
log.info("Resizing thread cache due to thread removal,
new cache size per thread is {}", cacheSizePerThread);
resizeThreadCache(cacheSizePerThread);
- if (groupInstanceID.isPresent() &&
callingThreadIsNotCurrentStreamThread) {
- final MemberToRemove memberToRemove = new
MemberToRemove(groupInstanceID.get());
- final Collection<MemberToRemove> membersToRemove =
Collections.singletonList(memberToRemove);
- final RemoveMembersFromConsumerGroupResult
removeMembersFromConsumerGroupResult =
- adminClient.removeMembersFromConsumerGroup(
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
- new
RemoveMembersFromConsumerGroupOptions(membersToRemove)
- );
- try {
- final long remainingTimeMs = timeoutMs -
(time.milliseconds() - startMs);
-
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
TimeUnit.MILLISECONDS);
- } catch (final
java.util.concurrent.TimeoutException exception) {
- log.error(
- String.format(
- "Could not remove static member %s
from consumer group %s due to a timeout:",
- groupInstanceID.get(),
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
- ),
- exception
- );
- throw new
TimeoutException(exception.getMessage(), exception);
- } catch (final InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (final ExecutionException exception) {
- log.error(
- String.format(
- "Could not remove static member %s
from consumer group %s due to:",
- groupInstanceID.get(),
-
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
- ),
- exception
- );
- throw new StreamsException(
- "Could not remove static member " +
groupInstanceID.get()
- + " from consumer group " +
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
- + " for the following reason: ",
- exception.getCause()
- );
- }
Review Comment:
Nice catch!
Let's wait for more feedback on it.
I hope we can reach a consensus on this change.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]