[
https://issues.apache.org/jira/browse/KAFKA-18067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17932778#comment-17932778
]
A. Sophie Blee-Goldman commented on KAFKA-18067:
------------------------------------------------
thanks for catching this. [~frankvicky] since we had to revert the initial fix,
want to revisit this issue and retry? Seems like we should not piggyback on the
existing #close method to set the "dont reinitialize" flag, since it's not only
called during shutdown but also during the `#resetProducer` method.
Basically we want a separate method that will "permanently" close the producer
only for when the thread is shutting down, and set the flag in there.
> Kafka Streams can leak Producer client under EOS
> ------------------------------------------------
>
> Key: KAFKA-18067
> URL: https://issues.apache.org/jira/browse/KAFKA-18067
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: A. Sophie Blee-Goldman
> Assignee: TengYao Chi
> Priority: Major
> Labels: newbie, newbie++
> Fix For: 4.1.0
>
>
> Under certain conditions Kafka Streams can end up closing a producer client
> twice and creating a new one that then is never closed.
> During a StreamThread's shutdown, the TaskManager is closed first, through
> which the thread's producer client is also closed. Later on we call
> #unsubscribe on the main consumer, which can result in the #onPartitionsLost
> callback being invoked and ultimately trying to reset/reinitialize the
> StreamsProducer if EOS is enabled. This in turn includes closing the current
> producer and creating a new one. And since the current producer was already
> closed, we end up closing that client twice and never closing the newly
> created producer.
> Ideally we would just skip the reset/reinitialize process entirely when
> invoked during shutdown. This solves the two problems here (leaked client and
> double close), while also removing the unnecessary overhead of creating an
> entirely new client just to throw it away
--
This message was sent by Atlassian Jira
(v8.20.10#820010)