[
https://issues.apache.org/jira/browse/KAFKA-15675?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810057#comment-17810057
]
Chris Egerton commented on KAFKA-15675:
---------------------------------------
I've done some analysis on this one and believe I've found the root cause. It's
a confluence of a few different issues, but the TL;DR is: *the request to
{{POST /connectors/<connector>/restart?onlyFailed=false&includeTasks=false}}
fails with a 409 error, this does not cause the test to (immediately) fail, but
the connector is never restarted, which causes the test to time out while
[waiting for the connector to be
stopped|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L272-L275].*
This kind of scenario probably raises several questions. Here's my best attempt
to anticipate and address them:
*Why does the 409 response not cause the test to immediately fail?*
It's unclear on the original rationale for this, but the code structure
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L374-L383]
is fairly clear: issue the request, and if the status code is less than 400,
attempt to deserialize the body. Then, unconditionally, return either null or
the deserialized response body.
*Why is the 409 response occurring?*
The cluster (or, to be more specific, either the worker that received the
initial REST request or, if the request was forwarded, the leader) detected
that a rebalance due to an added/removed connector or new task configs was
about to take place, and rejected the request. See the {{DistributedHerder}}
class's
[restartConnectorAndTasks|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L1467]
and
[checkRebalanceNeeded|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2302-L2307]
methods for the logic to check for pending rebalances, and its logic for
detecting pending rebalances
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2385],
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2400],
and
[here|https://github.com/apache/kafka/blob/4d6a422e8607c257e58b6956061732dc33621fc2/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L2419].
*Why is a rebalance pending by the time we try to restart the connector?
Shouldn't the cluster and the set of connectors and tasks on it be stable by
this point?*
Yes, the cluster and set of connectors and tasks on it should be stable by the
time we issue our restart request. We check to make sure that [every worker in
the cluster is up and
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L116-L117]
before proceeding with the rest of the test, and that the [connector and
expected number of tasks are
running|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java#L252-L253]
before issuing the restart request. Unfortunately, the former check–for worker
liveness across the cluster–does not guarantee that every worker has joined the
cluster. This check is [performed by issuing a request to the root
resource|https://github.com/apache/kafka/blob/0ef89a7cc059b0ba9b2536e018303c9004ac9142/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java#L956-L975]
({{{}GET /{}}}) for each worker: if the response is valid (i.e., its body
matches the expected format), then the worker is considered up and running.
However, this does not guarantee that the worker has actually completed
startup: it may not have finished reading to the end of internal topics, or had
a chance to contact the group coordinator and join the cluster yet.
After examining the logs of one test case, it appeared that the following
sequence of events took place:
# A single worker completes startup (creates and reads to the end of internal
topics, then joins the cluster)
# The connector is created (by chance, the REST request to create the
connector happens to be sent to the only worker that has completed startup so
far)
# The connector is assigned to the only worker currently in the cluster
# The connector generates task configs
# The tasks for that connector are assigned to the only worker currently in
the cluster
# The other, more sluggish, workers in the cluster detect the new connector
and/or task configs, and realize that a rebalance is pending
# An attempt is made to restart the connector (by chance, the REST request
happens to be sent to a worker that knows a rebalance is pending, but has not
yet completed that rebalance)
# The restart request is rejected with a 409 response
# The test fails
There are a few action items that come to mind based on this analysis:
# Unconditionally log an ERROR-level message in our integration testing
framework whenever a REST request is met with a response whose status code is
300 or higher
# Improve our worker liveness checks to guarantee not only that a worker's
REST server has started, but that it has had a chance to join the cluster
# Add retry logic when 409 responses are encountered during our integration
tests (this one is debatable, but our CI infrastructure is so miraculously
sluggish that rebalances from failure to read to the end of the config topic
may not be out of the realm of possibility). One possible approach could be to
re-perform a worker liveness check (one that guarantees that a worker is caught
up on the config topic and has had a chance to (re-)join the cluster) and then
re-issue the request, but only once.
> Fix flaky
> ConnectorRestartApiIntegrationTest.testMultiWorkerRestartOnlyConnector() test
> ---------------------------------------------------------------------------------------
>
> Key: KAFKA-15675
> URL: https://issues.apache.org/jira/browse/KAFKA-15675
> Project: Kafka
> Issue Type: Bug
> Reporter: Kirk True
> Assignee: Chris Egerton
> Priority: Major
> Labels: flaky-test
> Attachments: error.stacktrace.txt, error.stdout.txt
>
>
> This integration test is flaky around 9% of test runs. Source: [Gradle
> Enterprise test
> trends|https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=KAFKA&tests.container=org.apache.kafka.connect.integration.ConnectorRestartApiIntegrationTest&tests.test=testMultiWorkerRestartOnlyConnector].
> One failure had this message:
> {code:java}
> java.lang.AssertionError: Failed to stop connector and tasks within 120000ms
> {code}
> Please see the attachments for the stack trace and stdout log.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)