[
https://issues.apache.org/jira/browse/KAFKA-17044?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bhagyashree updated KAFKA-17044:
--------------------------------
Description:
We have identified a gap in the shutdown flow for the connector worker. If the
connector is in
[INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
state and still executing the
[WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
method, a DELETE API call would invoke the
[WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
and [notify()
|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
the connector worker would not shutdown immediately. This happens because
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
is a blocking call and the control reaches
[wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
in
[doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
after the
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
call has completed. This results in a gap in the delete flow where the
connector is not immediately shutdown leaving the resources running.
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
keeps running and only when the execution of
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
completes, we reach at the point of
[wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
and then
[doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
of the connector worker is invoked.
This seems similar to what has been identified for connector tasks as part of
https://issues.apache.org/jira/browse/KAFKA-14725.
*Steps to repro*
1. Start a connector with time taking operation in
[connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
call
2. Call DELETE API to delete this connector
3. The connector would be deleted only after the
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
completes.
The issue was observed when a connector was configured to retry a db connection
for sometime.
{*}Current Behaviour{*}: The connector did not shutdown until the
[start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
method completed.
{*}Expected Behaviou{*}r: The connector should abort what it is doing and
shutdown as requested by the Delete call.
was:
We have identified a gap in the shutdown flow for the connector worker. If the
connector is in INIT state and still executing the WorkerConnector::doStart
method, a DELETE API call would invoke the WorkerConnector::shutdown and
notify() but the connector worker would not shutdown immediately. This happens
because start() is a blocking call and the control reaches wait() in doRun()
after the start() call has completed. This results in a gap in the delete flow
where the connector is not immediately shutdown leaving the resources running.
start() keeps running and only when the execution of start() completes, we
reach at the point of wait() and then doShutdown() of the connector worker is
invoked.
This seems similar to what has been identified for connector tasks as part of
https://issues.apache.org/jira/browse/KAFKA-14725.
*Steps to repro*
1. Start a connector with time taking operation in connector.start() call
2. Call DELETE API to delete this connector
3. The connector would be deleted only after the start() completes.
The issue was observed when a connector was configured to retry a db connection
for sometime.
{*}Current Behaviour{*}: The connector did not shutdown until it exhausted the
retry count.
{*}Expected Behaviou{*}r: The connector should abort what it is doing and
shutdown as requested by the Delete call.
> Connector worker does not shutdown until start completes
> ---------------------------------------------------------
>
> Key: KAFKA-17044
> URL: https://issues.apache.org/jira/browse/KAFKA-17044
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Reporter: Bhagyashree
> Priority: Major
>
> We have identified a gap in the shutdown flow for the connector worker. If
> the connector is in
> [INIT|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L403-L404]
> state and still executing the
> [WorkerConnector::doStart|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L207-L218]
> method, a DELETE API call would invoke the
> [WorkerConnector::shutdown|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L294-L298]
> and [notify()
> |https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L297]but
> the connector worker would not shutdown immediately. This happens because
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> is a blocking call and the control reaches
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
> in
> [doRun()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L151]
> after the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> call has completed. This results in a gap in the delete flow where the
> connector is not immediately shutdown leaving the resources running.
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> keeps running and only when the execution of
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> completes, we reach at the point of
> [wait()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L176]
> and then
> [doShutdown()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L183]
> of the connector worker is invoked.
> This seems similar to what has been identified for connector tasks as part of
> https://issues.apache.org/jira/browse/KAFKA-14725.
> *Steps to repro*
> 1. Start a connector with time taking operation in
> [connector.start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> call
> 2. Call DELETE API to delete this connector
> 3. The connector would be deleted only after the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> completes.
> The issue was observed when a connector was configured to retry a db
> connection for sometime.
> {*}Current Behaviour{*}: The connector did not shutdown until the
> [start()|https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java#L216]
> method completed.
> {*}Expected Behaviou{*}r: The connector should abort what it is doing and
> shutdown as requested by the Delete call.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)