[
https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17881175#comment-17881175
]
Yu-Lin Chen commented on KAFKA-17515:
-------------------------------------
I found two issues in the flaky tests:
# The local state is purged after kafkaStreams.close() timeout (Current
timeout = 5 sec). This is the root cause of the [flaky
test|https://ge.apache.org/s/havqcr7zu2tbk/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?expanded-stacktrace=WyIwIl0&page=eyJvdXRwdXQiOnsiMCI6MSwiMSI6Mn19&top-execution=1]
that occured on Sep 10 2024 16:55:51 CST.
# Some partition tasks assigned to first kafkaStream(ks-1), are reassigned to
second KafkaStream(ks-2) after rebalancing. However, the reassigned tasks are
not started, so it’s won’t enter the suspend state. This is the root cause of
the [flak
test|https://ge.apache.org/s/hdpapdbvngcts/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]
that occured on Sep 10 2024 11:44:34 CST.
Analysis resutl from flaky logs: (TL;DR;)
For flaky #1, it take
2024-09-10 04:35:14,227 [ks1-StreamThread-1] Informed to shut down
2024-09-10 04:35:19,349 [ks1-StreamThread-1] Shutdown completed
-> It took 5.022 secs
-> the retry test took 2.429 secs
-> My local env took 0.2 secs
For the failed flaky test #2,
Below is the time flow of the second flaky test:
time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4)
time 2: ks-1 (0_4 restore started)
time 3: ks-1 (0_2 restore started)
time 4: ks-1 (0_0 restore started)
time 5: ks-2 (After rebalancing, assigned 0_1, 0_3), those test haven't started
on ks-1
time 6: ks-2 (0_1 restore started)
time 7: ks-2 (0_3 restore started)
In this case:
`kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends()` will never
be true since no "Suspended from RUNNING" for task 0_1 and 0_3 in ks-1.
The log of sucessful run in flaky #2
time 1: ks-1 (Assigned 0_0, 0_1, 0_2, 0_3, 0_4)
time 2: ks-1 (0_1 restore started)
time 3: ks-1 (0_3 restore started)
time 4: ks-1 (0_4 restore started)
time 5: ks-1 (0_2 restore started)
time 6: ks-1 (0_0 restore started)
time 7: ks-2 (After rebalancing, assigned 0_0, 0_2, 0_4)
time 8: ks-1 (task 0_4 suspended from RESTORING)
time 9: ks-1 (task 0_2 suspended from RESTORING)
time 10: ks-1 (task 0_0 suspended from RESTORING)
time 11: ks-2 (0_4 restore started)
time 12: ks-2 (0_2 restore started)
time 13: ks-2 (0_0 restore started)
kafkaStreams1StateRestoreListener.awaitUntilRestorationSuspends() passed at
time 8.
kafkaStreams2StateRestoreListener.awaitUntilRestorationStarts() passed at time
11.
[~chia7712] If you're not workign on this, I could submit a PR to fix the known
issue.
> Fix flaky
> RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener
> ----------------------------------------------------------------------------------
>
> Key: KAFKA-17515
> URL: https://issues.apache.org/jira/browse/KAFKA-17515
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Reporter: Chia-Ping Tsai
> Assignee: Chia-Ping Tsai
> Priority: Major
>
> {code:java}
> Stacktrace
> java.nio.file.DirectoryNotEmptyException:
> /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0
> at
> java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289)
> at
> java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109)
> at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191)
> at
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898)
> at
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2803)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2857)
> at org.apache.kafka.common.utils.Utils.delete(Utils.java:870)
> at
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266)
> at
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278)
> at
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)