[
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867047#comment-17867047
]
Bhagyashree edited comment on KAFKA-17044 at 7/18/24 4:12 PM:
--------------------------------------------------------------
[~ChrisEgerton] , I agree there are ways to make start method of the connector
finish faster. But as part of this JIRA, what I wanted to convey is that the
shutdown method relies on connector startup to finish. If a DELETE call is
made, the call is not honoured until the connector start completes.
Taking example of the same connector where this is seen, this is JDBC source
([https://github.com/confluentinc/kafka-connect-jdbc/tree/master)|https://github.com/confluentinc/kafka-connect-jdbc/tree/master]
connector. If you check the
[stop|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L219C9-L219C33]
method of the connector, the method tries to update a [static
member|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L106]
of the class which in turn is expected to [stop the
retries|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L80-L101].
How I see is that the connector is written to honour any shutdown attempts
made in middle of the retries but the connector's
[stop()|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L210]
is not getting called by runtime.
I found it similar to https://issues.apache.org/jira/browse/KAFKA-14725 but
with connector.
Let me know your thoughts.
was (Author: JIRAUSER305984):
[~ChrisEgerton] , I agree there are ways to make start method of the connector
finish faster. But as part of this JIRA, what I wanted to convey is that the
shutdown method relies on connector startup to finish. If a DELETE call is
made, the call is not honoured until the connector start completes.
Taking example of the same connector where this is seen, this is JDBC
source[https://github.com/confluentinc/kafka-connect-jdbc/tree/master]
connector. If you check the
[stop|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L219C9-L219C33]
method of the connector, the method tries to update a [static
member|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L106]
of the class which in turn is expected to [stop the
retries|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/util/CachedConnectionProvider.java#L80-L101].
How I see is that the connector is written to honour any shutdown attempts
made in middle of the retries but the connector's
[stop()|https://github.com/confluentinc/kafka-connect-jdbc/blob/master/src/main/java/io/confluent/connect/jdbc/JdbcSourceConnector.java#L210]
is not getting called by runtime.
I found it similar to https://issues.apache.org/jira/browse/KAFKA-14725 but
with connector.
Let me know your thoughts.
> Connector deletion can lead to resource leak during a long running connector
> startup
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Reporter: Bhagyashree
> Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If
> the connector is in
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
> state and still executing the
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
> method, a DELETE API call would invoke the
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
> and [notify()
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
> the connector worker would not shutdown immediately. This happens because
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> is a blocking call and the control reaches
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
> in
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
> after the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> call has completed. This results in a gap in the delete flow where the
> connector is not immediately shutdown leaving the resources running.
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> keeps running and only when the execution of
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> completes, we reach at the point of
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
> and then
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
> of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> completes.
> The issue was observed when a connector was configured to retry a db
> connection for sometime.
> {*}Current Behaviour{*}: The connector did not shutdown until the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and
> shutdown as requested by the Delete call.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)