[
https://issues.apache.org/jira/browse/KAFKA-10763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Konstantine Karantasis resolved KAFKA-10763.
--------------------------------------------
Resolution: Fixed
> Task already exists error on same worker due to skip removal of tasks
> ---------------------------------------------------------------------
>
> Key: KAFKA-10763
> URL: https://issues.apache.org/jira/browse/KAFKA-10763
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Affects Versions: 2.3.0
> Reporter: Shao Wang
> Assignee: Greg Harris
> Priority: Major
> Fix For: 2.3.2, 2.4.2, 2.5.2, 2.8.0, 2.7.1, 2.6.2
>
>
> In our production environment, upon start two KafkaConnect workers, during
> the first couple of minutes, the leader bounces between worker1 and worker2.
> And a lot of tasks throw Task already exists in this worker exception on
> worker2.
> The sequence of events:
> worker2(hostname:sinkdp2)
> gen3 assign
> Start task 1
> gen4 assign task 1
> gen5 assign task 1
> gen6 skip stopping task 1 and removal due to rebalance unresolved
> revoke
> gen7 assign task 1
> Start task 1(Task already exists eror)
>
> Worker1(hostname: sinkdp1)
> {code:java}
> 03:36:07,340 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:36:10,460 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 1 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842',
> leaderUrl='http://sinkdp1:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:36:10,694 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Starting task
> dp-hive-sink-connector-dptask_475_22-0
> [pool-9-thread-5][DistributedHerder.java:1073]
> 03:36:10,979 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0
> with version 0.14.0-SNAPSHOT of type
> com.datapipeline.sink.connector.hive.HiveConnectorTask
> [pool-9-thread-5][Worker.java:426]
> 03:36:37,692 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:36:37,806 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0
> [pool-9-thread-5][Worker.java:702]
> 03:40:09,721 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for
> rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:40:09,722 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 2 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842',
> leaderUrl='http://sinkdp1:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_599_20,
> dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
> 03:40:09,722 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:41:10,650 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last
> rebalance, can skip stopping connectors and tasks
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> 03:41:10,650 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 4 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4',
> leaderUrl='http://sinkdp2:8083/', offset=6457, connectorIds=[], taskIds=[],
> revokedConnectorIds=[dp-hive-sink-connector-dptask_599_20,
> dp-tidb-connector-dptask
> 03:41:10,651 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:42:10,815 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 5 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17',
> leaderUrl='http://sinkdp1:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:42:10,953 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Starting task
> dp-hive-sink-connector-dptask_475_22-0
> [pool-9-thread-8][DistributedHerder.java:1073]
> 03:42:10,953 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0
> with version 0.14.0-SNAPSHOT of type
> com.datapipeline.sink.connector.hive.HiveConnectorTask
> [pool-9-thread-8][Worker.java:426]
> 03:42:29,429 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:43:28,336 [INFO ] Stopping task dp-hive-sink-connector-dptask_475_22-0
> [pool-9-thread-2][Worker.java:702]
> 03:46:05,804 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for
> rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:46:05,806 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 6 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17',
> leaderUrl='http://sinkdp1:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_599_20,
> dp-tidb-connector-dptask_580_13, dp-hive-sink-connector-dpta
> 03:46:05,806 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:47:06,564 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last
> rebalance, can skip stopping connectors and tasks
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> {code}
> Worker2 (hostname: sinkdp2)
> {code:java}
> 03:36:35,984 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:36:37,780 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 2 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-a5790b31-6890-4958-905d-d44be9e18842',
> leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[],
> revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0
> [Dist
> 03:37:40,789 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:37:40,916 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 3 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4',
> leaderUrl='http://sinkdp2:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:37:41,151 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Starting task
> dp-hive-sink-connector-dptask_475_22-0
> [pool-9-thread-1][DistributedHerder.java:1073]
> 03:37:41,507 [INFO ] Instantiated task dp-hive-sink-connector-dptask_475_22-0
> with version 0.14.0-SNAPSHOT of type
> com.datapipeline.sink.connector.hive.HiveConnectorTask
> [pool-9-thread-1][Worker.java:426]
> 03:40:13,254 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:42:27,376 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for
> rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:42:27,377 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 4 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-ff52e9fd-c96c-4e50-93cf-25cf2ae430a4',
> leaderUrl='http://sinkdp2:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:42:27,378 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:43:28,190 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last
> rebalance, can skip stopping connectors and tasks
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> 03:43:28,191 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 6 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-5a0dbfd9-10d8-42dd-8efd-52ac6ba81e17',
> leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[],
> revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-d
> 03:43:28,191 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:44:28,358 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 7 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d',
> leaderUrl='http://sinkdp2:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:44:28,692 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Starting task
> dp-hive-sink-connector-dptask_475_22-0
> [pool-9-thread-7][DistributedHerder.java:1073]
> kafka.connect.errors.ConnectException: Task already exists in this worker:
> dp-hive-sink-connector-dptask_475_22-0
> 03:46:07,401 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:48:07,024 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Finished stopping tasks in preparation for
> rebalance [DistributedHerder-connect-1][DistributedHerder.java:1502]
> 03:48:07,246 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 8 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-8494a329-d5a8-443c-8c90-ba710cc44c2d',
> leaderUrl='http://sinkdp2:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpta
> 03:48:07,246 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:49:07,446 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Wasn't unable to resume work after last
> rebalance, can skip stopping connectors and tasks
> [DistributedHerder-connect-1][DistributedHerder.java:1517]
> 03:49:07,446 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 10 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-2a498edc-a517-457b-b982-c84cc9ff4521',
> leaderUrl='http://sinkdp1:8083/', offset=6457, connectorIds=[], taskIds=[],
> revokedConnectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-
> 03:49:07,447 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Rebalance started
> [DistributedHerder-connect-1][WorkerCoordinator.java:233]
> 03:50:07,677 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Joined group at generation 11 with protocol
> version 1 and got assignment: Assignment{error=0,
> leader='connect-1-d0be6a23-8dfa-4289-a818-8a655effa48d',
> leaderUrl='http://sinkdp2:8083/', offset=6457,
> connectorIds=[dp-hive-sink-connector-dptask_475_22,
> dp-hive-sink-connector-dptask_366_20, dp-tidb-connector-dpt
> 03:50:08,079 [INFO ] [Worker clientId=connect-1,
> groupId=group_connect_sink_dp] Starting task
> dp-hive-sink-connector-dptask_475_22-0
> [pool-9-thread-3][DistributedHerder.java:1073]
> kafka.connect.errors.ConnectException: Task already exists in this worker:
> dp-hive-sink-connector-dptask_475_22-0
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)