[
https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882672#comment-17882672
]
Yu-Lin Chen commented on KAFKA-17515:
-------------------------------------
The another flaky was caused by long initialize of stream tasks. It took longer
than 60 secs so no record was processed. ([flaky
link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1])
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 60000.
Did not receive all [KeyValue(0, 0), KeyValue(1, 1), KeyValue(2, 2),
KeyValue(3, 3), KeyValue(4, 4), KeyValue(5, 5), KeyValue(6, 6), KeyValue(7, 7),
KeyValue(8, 8), KeyValue(9, 9), KeyValue(10, 10), KeyValue(11, 11),
KeyValue(12, 12), KeyValue(13, 13), KeyValue(14, 14), KeyValue(15, 15),
KeyValue(16, 16), KeyValue(17, 17), KeyValue(18, 18), KeyValue(19, 19),
KeyValue(20, 20), KeyValue(21, 21), KeyValue(22, 22), KeyValue(23, 23),
KeyValue(24, 24), KeyValue(25, 25), KeyValue(26, 26), KeyValue(27, 27),
KeyValue(28, 28), KeyValue(29, 29), KeyValue(30, 30), KeyValue(31, 31),
KeyValue(32, 32), KeyValue(33, 33), KeyValue(34, 34), KeyValue(35, 35),
KeyValue(36, 36), KeyValue(37, 37), KeyValue(38, 38), KeyValue(39, 39),
KeyValue(40, 40), KeyValue(41, 41), KeyValue(42, 42), KeyValue(43, 43),
KeyValue(44, 44), KeyValue(45, 45), KeyValue(46, 46), KeyValue(47, 47),
KeyValue(48, 48), KeyValue(49, 49), KeyValue(50, 50), KeyValue(51, 51),
KeyValue(52, 52), KeyValue(53, 53), KeyValue(54, 54), KeyValue(55, 55),
KeyValue(56, 56), KeyValue(57, 57), KeyValue(58, 58), KeyValue(59, 59),
KeyValue(60, 60), KeyValue(61, 61), KeyValue(62, 62), KeyValue(63, 63),
KeyValue(64, 64), KeyValue(65, 65), KeyValue(66, 66), KeyValue(67, 67),
KeyValue(68, 68), KeyValue(69, 69), KeyValue(70, 70), KeyValue(71, 71),
KeyValue(72, 72), KeyValue(73, 73), KeyValue(74, 74), KeyValue(75, 75),
KeyValue(76, 76), KeyValue(77, 77), KeyValue(78, 78), KeyValue(79, 79),
KeyValue(80, 80), KeyValue(81, 81), KeyValue(82, 82), KeyValue(83, 83),
KeyValue(84, 84), KeyValue(85, 85), KeyValue(86, 86), KeyValue(87, 87),
KeyValue(88, 88), KeyValue(89, 89), KeyValue(90, 90), KeyValue(91, 91),
KeyValue(92, 92), KeyValue(93, 93), KeyValue(94, 94), KeyValue(95, 95),
KeyValue(96, 96), KeyValue(97, 97), KeyValue(98, 98), KeyValue(99, 99)] records
from topic outputTopic (got []) ==> expected: <true> but was: <false> {code}
Logs:
{code:java}
[2024-09-12 18:50:29,644] INFO ... ks1-StreamThread-1] State transition from
CREATED to STARTING
[2024-09-12 18:50:38,263] INFO ... ks1-StreamThread-1] State transition from
STARTING to PARTITIONS_ASSIGNED
[2024-09-12 18:50:48,813] INFO ... ks1-StreamThread-1] task [0_3] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:02,038] INFO ... ks1-StreamThread-1] task [0_0] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:14,316] INFO ... ks1-StreamThread-1] task [0_4] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:25,080] INFO ... ks1-StreamThread-1] task [0_1] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:37,862] INFO ... ks1-StreamThread-1] task [0_2] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277) {code}
In the flaky test run, each step takes the following amount of time for task
initialization:
# 8.619 sec for rebalancing
# 10.550 sec for initializing task [0_3]
# 13.225 sec for initializing task [0_0]
# 12.278 sec for initializing task [0_4]
# 10.764 sec for initializing task [0_1]
# 12.782 sec for initializing task [0_2]
It took longer than 60 seconds. The followings are the tasks initialization
time in other flaky test:
# Failed Execution
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
68.218 sec (Started Sep 13 2024 at 02:50:08 CST)
# Successed Execution
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2])
6.718 sec (Started Sep 13 2024 at 05:40:21 CST)
# Failed Execution
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
59.632 sec (Started Sep 14 2024 at 04:05:41 CST)
# Passed Execution
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]):
7.891(Started Sep 14 2024 at 07:36:26 CST)
This issue doesn't occurs in the last three days, so I think we could fix the
known issue in the PR first.
Keep moniotoring the flaky rate in the next following days to see whether we
should loose the timeout again.
> Fix flaky
> RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener
> ----------------------------------------------------------------------------------
>
> Key: KAFKA-17515
> URL: https://issues.apache.org/jira/browse/KAFKA-17515
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Reporter: Chia-Ping Tsai
> Assignee: Yu-Lin Chen
> Priority: Major
> Attachments: Reproduced screenshoot in my env (Loop 7).png
>
>
> {code:java}
> Stacktrace
> java.nio.file.DirectoryNotEmptyException:
> /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0
> at
> java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289)
> at
> java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109)
> at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191)
> at
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898)
> at
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2803)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2857)
> at org.apache.kafka.common.utils.Utils.delete(Utils.java:870)
> at
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266)
> at
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278)
> at
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)