[ 
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhagyashree updated KAFKA-17044:
--------------------------------
    Description: 
We have identified a gap in the shutdown flow for the connector worker. If the 
connector is in 
[INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
 state and still executing the 
[WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
 method, a DELETE API call would invoke the 
[WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
 and [notify() 
|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
 the connector worker would not shutdown immediately. This happens because 
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
 is a blocking call and the control reaches 
[wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
 in 
[doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
 after the 
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
 call has completed. This results in a gap in the delete flow where the 
connector is not immediately shutdown leaving the resources running. 
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
 keeps running and only when the execution of 
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
 completes, we reach at the point of 
[wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
 and then 
[doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
 of the connector worker is invoked.
This seems similar to what has been identified for connector tasks as part of 
https://issues.apache.org/jira/browse/KAFKA-14725.
*Steps to repro*
1. Start a connector with time taking operation in 
[connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
 call
2. Call DELETE API to delete this connector
3. The connector would be deleted only after the 
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
 completes.

The issue was observed when a connector was configured to retry a db connection 
for sometime. 
{*}Current Behaviour{*}: The connector did not shutdown until the 
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
 method completed.
{*}Expected Behaviou{*}r: The connector should abort what it is doing and 
shutdown as requested by the Delete call.

  was:
We have identified a gap in the shutdown flow for the connector worker. If the 
connector is in INIT state and still executing the WorkerConnector::doStart 
method, a DELETE API call would invoke the WorkerConnector::shutdown and 
notify() but the connector worker would not shutdown immediately. This happens 
because start() is a blocking call and the control reaches wait() in doRun() 
after the start() call has completed. This results in a gap in the delete flow 
where the connector is not immediately shutdown leaving the resources running. 
start() keeps running and only when the execution of start() completes, we 
reach at the point of wait() and then doShutdown() of the connector worker is 
invoked.
This seems similar to what has been identified for connector tasks as part of 
https://issues.apache.org/jira/browse/KAFKA-14725.
*Steps to repro*
1. Start a connector with time taking operation in connector.start() call
2. Call DELETE API to delete this connector
3. The connector would be deleted only after the start() completes.

The issue was observed when a connector was configured to retry a db connection 
for sometime. 
{*}Current Behaviour{*}: The connector did not shutdown until it exhausted the 
retry count.
{*}Expected Behaviou{*}r: The connector should abort what it is doing and 
shutdown as requested by the Delete call.


> Connector worker does not shutdown until start completes 
> ---------------------------------------------------------
>
>                 Key: KAFKA-17044
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17044
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>            Reporter: Bhagyashree
>            Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If 
> the connector is in 
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
>  state and still executing the 
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
>  method, a DELETE API call would invoke the 
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
>  and [notify() 
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
>  the connector worker would not shutdown immediately. This happens because 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  is a blocking call and the control reaches 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  in 
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
>  after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call has completed. This results in a gap in the delete flow where the 
> connector is not immediately shutdown leaving the resources running. 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  keeps running and only when the execution of 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes, we reach at the point of 
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
>  and then 
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
>  of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of 
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in 
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  completes.
> The issue was observed when a connector was configured to retry a db 
> connection for sometime. 
> {*}Current Behaviour{*}: The connector did not shutdown until the 
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
>  method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and 
> shutdown as requested by the Delete call.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to