[
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869711#comment-17869711
]
Chris Egerton edited comment on KAFKA-17044 at 7/30/24 5:09 PM:
----------------------------------------------------------------
Sorry, just to extra clear–I was asking if the connector _queried_ the database
synchronously, which after looking at the code, it does not. Yes, it tries to
establish a connection in {{{}start{}}}, but it delegates all fetching of table
names to a separate thread. I'm just making note of this in case other readers
stumble on this ticket; want to make sure nobody gets misled.
It also looks like it would be trivial to refactor the connector to establish a
database connection outside of {{{}start{}}}, which IMO would be an improvement
to prevent resource leaks like the ones described by this ticket.
was (Author: chrisegerton):
Sorry, just to extra clear–I was asking if the connector _queried_ the database
synchronously, which after looking at the code, it does not. Yes, it tries to
establish a connection in {{{}start{}}}, but it delegates all fetching of table
names to a separate thread. I'm just making note of this in case other readers
stumble on this ticket; want to make sure nobody gets misled.
It also looks like it would be trivial to refactor the connector to establish a
database connection outside of {{{}start{}}}, which IMO would be an improvement
to prevent resource leaks like the ones described by this ticket.
> Connector deletion can lead to resource leak during a long running connector
> startup
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Reporter: Bhagyashree
> Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If
> the connector is in
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
> state and still executing the
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
> method, a DELETE API call would invoke the
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
> and [notify()
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
> the connector worker would not shutdown immediately. This happens because
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> is a blocking call and the control reaches
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
> in
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
> after the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> call has completed. This results in a gap in the delete flow where the
> connector is not immediately shutdown leaving the resources running.
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> keeps running and only when the execution of
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> completes, we reach at the point of
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
> and then
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
> of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> completes.
> The issue was observed when a connector was configured to retry a db
> connection for sometime.
> {*}Current Behaviour{*}: The connector did not shutdown until the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and
> shutdown as requested by the Delete call.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)