[
https://issues.apache.org/jira/browse/KAFKA-17515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17882672#comment-17882672
]
Yu-Lin Chen edited comment on KAFKA-17515 at 9/18/24 11:05 AM:
---------------------------------------------------------------
The another flaky was caused by long initialize of stream tasks. It took longer
than 60 secs so no record was processed. ([flaky
link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1])
Error:
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 60000.
Did not receive all [KeyValue(0, 0), KeyValue(1, 1), KeyValue(2, 2),
KeyValue(3, 3), KeyValue(4, 4), KeyValue(5, 5), KeyValue(6, 6), KeyValue(7, 7),
KeyValue(8, 8), KeyValue(9, 9), KeyValue(10, 10), KeyValue(11, 11),
KeyValue(12, 12), KeyValue(13, 13), KeyValue(14, 14), KeyValue(15, 15),
KeyValue(16, 16), KeyValue(17, 17), KeyValue(18, 18), KeyValue(19, 19),
KeyValue(20, 20), KeyValue(21, 21), KeyValue(22, 22), KeyValue(23, 23),
KeyValue(24, 24), KeyValue(25, 25), KeyValue(26, 26), KeyValue(27, 27),
KeyValue(28, 28), KeyValue(29, 29), KeyValue(30, 30), KeyValue(31, 31),
KeyValue(32, 32), KeyValue(33, 33), KeyValue(34, 34), KeyValue(35, 35),
KeyValue(36, 36), KeyValue(37, 37), KeyValue(38, 38), KeyValue(39, 39),
KeyValue(40, 40), KeyValue(41, 41), KeyValue(42, 42), KeyValue(43, 43),
KeyValue(44, 44), KeyValue(45, 45), KeyValue(46, 46), KeyValue(47, 47),
KeyValue(48, 48), KeyValue(49, 49), KeyValue(50, 50), KeyValue(51, 51),
KeyValue(52, 52), KeyValue(53, 53), KeyValue(54, 54), KeyValue(55, 55),
KeyValue(56, 56), KeyValue(57, 57), KeyValue(58, 58), KeyValue(59, 59),
KeyValue(60, 60), KeyValue(61, 61), KeyValue(62, 62), KeyValue(63, 63),
KeyValue(64, 64), KeyValue(65, 65), KeyValue(66, 66), KeyValue(67, 67),
KeyValue(68, 68), KeyValue(69, 69), KeyValue(70, 70), KeyValue(71, 71),
KeyValue(72, 72), KeyValue(73, 73), KeyValue(74, 74), KeyValue(75, 75),
KeyValue(76, 76), KeyValue(77, 77), KeyValue(78, 78), KeyValue(79, 79),
KeyValue(80, 80), KeyValue(81, 81), KeyValue(82, 82), KeyValue(83, 83),
KeyValue(84, 84), KeyValue(85, 85), KeyValue(86, 86), KeyValue(87, 87),
KeyValue(88, 88), KeyValue(89, 89), KeyValue(90, 90), KeyValue(91, 91),
KeyValue(92, 92), KeyValue(93, 93), KeyValue(94, 94), KeyValue(95, 95),
KeyValue(96, 96), KeyValue(97, 97), KeyValue(98, 98), KeyValue(99, 99)] records
from topic outputTopic (got []) ==> expected: <true> but was: <false> {code}
Logs:
{code:java}
[2024-09-12 18:50:29,644] INFO ... ks1-StreamThread-1] State transition from
CREATED to STARTING
[2024-09-12 18:50:38,263] INFO ... ks1-StreamThread-1] State transition from
STARTING to PARTITIONS_ASSIGNED
[2024-09-12 18:50:48,813] INFO ... ks1-StreamThread-1] task [0_3] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:02,038] INFO ... ks1-StreamThread-1] task [0_0] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:14,316] INFO ... ks1-StreamThread-1] task [0_4] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:25,080] INFO ... ks1-StreamThread-1] task [0_1] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:37,862] INFO ... ks1-StreamThread-1] task [0_2] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277) {code}
In the flaky test run, each step of initial a stream task took the following
amount of time for task initialization:
# 8.619 sec for rebalancing
# 10.550 sec for initializing task [0_3]
# 13.225 sec for initializing task [0_0]
# 12.278 sec for initializing task [0_4]
# 10.764 sec for initializing task [0_1]
# 12.782 sec for initializing task [0_2]
It took longer than 60 seconds. But in most of the time, it can be done less
than 10 secs.
The followings are the total tasks initialization time in other flaky test:
# Failed Execution
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
68.218 sec (Started Sep 13 2024 at 02:50:08 CST)
# Successed Execution
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2])
6.718 sec (Started Sep 13 2024 at 05:40:21 CST)
# Failed Execution
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
59.632 sec (Started Sep 14 2024 at 04:05:41 CST)
# Passed Execution
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]):
7.891(Started Sep 14 2024 at 07:36:26 CST)
This issue doesn't occurs in the last three days, so I think we could fix the
known issue in the PR first.
Keep moniotoring the flaky rate in the next following days to see whether we
should loose the timeout again.
was (Author: yu-lin chen):
The another flaky was caused by long initialize of stream tasks. It took longer
than 60 secs so no record was processed. ([flaky
link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1])
Error:
{code:java}
org.opentest4j.AssertionFailedError: Condition not met within timeout 60000.
Did not receive all [KeyValue(0, 0), KeyValue(1, 1), KeyValue(2, 2),
KeyValue(3, 3), KeyValue(4, 4), KeyValue(5, 5), KeyValue(6, 6), KeyValue(7, 7),
KeyValue(8, 8), KeyValue(9, 9), KeyValue(10, 10), KeyValue(11, 11),
KeyValue(12, 12), KeyValue(13, 13), KeyValue(14, 14), KeyValue(15, 15),
KeyValue(16, 16), KeyValue(17, 17), KeyValue(18, 18), KeyValue(19, 19),
KeyValue(20, 20), KeyValue(21, 21), KeyValue(22, 22), KeyValue(23, 23),
KeyValue(24, 24), KeyValue(25, 25), KeyValue(26, 26), KeyValue(27, 27),
KeyValue(28, 28), KeyValue(29, 29), KeyValue(30, 30), KeyValue(31, 31),
KeyValue(32, 32), KeyValue(33, 33), KeyValue(34, 34), KeyValue(35, 35),
KeyValue(36, 36), KeyValue(37, 37), KeyValue(38, 38), KeyValue(39, 39),
KeyValue(40, 40), KeyValue(41, 41), KeyValue(42, 42), KeyValue(43, 43),
KeyValue(44, 44), KeyValue(45, 45), KeyValue(46, 46), KeyValue(47, 47),
KeyValue(48, 48), KeyValue(49, 49), KeyValue(50, 50), KeyValue(51, 51),
KeyValue(52, 52), KeyValue(53, 53), KeyValue(54, 54), KeyValue(55, 55),
KeyValue(56, 56), KeyValue(57, 57), KeyValue(58, 58), KeyValue(59, 59),
KeyValue(60, 60), KeyValue(61, 61), KeyValue(62, 62), KeyValue(63, 63),
KeyValue(64, 64), KeyValue(65, 65), KeyValue(66, 66), KeyValue(67, 67),
KeyValue(68, 68), KeyValue(69, 69), KeyValue(70, 70), KeyValue(71, 71),
KeyValue(72, 72), KeyValue(73, 73), KeyValue(74, 74), KeyValue(75, 75),
KeyValue(76, 76), KeyValue(77, 77), KeyValue(78, 78), KeyValue(79, 79),
KeyValue(80, 80), KeyValue(81, 81), KeyValue(82, 82), KeyValue(83, 83),
KeyValue(84, 84), KeyValue(85, 85), KeyValue(86, 86), KeyValue(87, 87),
KeyValue(88, 88), KeyValue(89, 89), KeyValue(90, 90), KeyValue(91, 91),
KeyValue(92, 92), KeyValue(93, 93), KeyValue(94, 94), KeyValue(95, 95),
KeyValue(96, 96), KeyValue(97, 97), KeyValue(98, 98), KeyValue(99, 99)] records
from topic outputTopic (got []) ==> expected: <true> but was: <false> {code}
Logs:
{code:java}
[2024-09-12 18:50:29,644] INFO ... ks1-StreamThread-1] State transition from
CREATED to STARTING
[2024-09-12 18:50:38,263] INFO ... ks1-StreamThread-1] State transition from
STARTING to PARTITIONS_ASSIGNED
[2024-09-12 18:50:48,813] INFO ... ks1-StreamThread-1] task [0_3] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:02,038] INFO ... ks1-StreamThread-1] task [0_0] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:14,316] INFO ... ks1-StreamThread-1] task [0_4] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:25,080] INFO ... ks1-StreamThread-1] task [0_1] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277)
[2024-09-12 18:51:37,862] INFO ... ks1-StreamThread-1] task [0_2] Initialized
(org.apache.kafka.streams.processor.internals.StreamTask:277) {code}
In the flaky test run, each step takes the following amount of time for task
initialization:
# 8.619 sec for rebalancing
# 10.550 sec for initializing task [0_3]
# 13.225 sec for initializing task [0_0]
# 12.278 sec for initializing task [0_4]
# 10.764 sec for initializing task [0_1]
# 12.782 sec for initializing task [0_2]
It took longer than 60 seconds. The followings are the tasks initialization
time in other flaky test:
# Failed Execution
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
68.218 sec (Started Sep 13 2024 at 02:50:08 CST)
# Successed Execution
([Link|https://ge.apache.org/s/kzmyf3c3jo7fo/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2])
6.718 sec (Started Sep 13 2024 at 05:40:21 CST)
# Failed Execution
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=1]):
59.632 sec (Started Sep 14 2024 at 04:05:41 CST)
# Passed Execution
([Link|https://ge.apache.org/s/g72utse4pdln2/tests/task/:streams:test/details/org.apache.kafka.streams.integration.RestoreIntegrationTest/shouldInvokeUserDefinedGlobalStateRestoreListener()?top-execution=2]):
7.891(Started Sep 14 2024 at 07:36:26 CST)
This issue doesn't occurs in the last three days, so I think we could fix the
known issue in the PR first.
Keep moniotoring the flaky rate in the next following days to see whether we
should loose the timeout again.
> Fix flaky
> RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener
> ----------------------------------------------------------------------------------
>
> Key: KAFKA-17515
> URL: https://issues.apache.org/jira/browse/KAFKA-17515
> Project: Kafka
> Issue Type: Bug
> Components: streams, unit tests
> Reporter: Chia-Ping Tsai
> Assignee: Yu-Lin Chen
> Priority: Major
> Attachments: Reproduced screenshoot in my env (Loop 7).png
>
>
> {code:java}
> Stacktrace
> java.nio.file.DirectoryNotEmptyException:
> /tmp/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ10111145955704739924-ks1/shouldInvokeUserDefinedGlobalStateRestoreListenerH0u0n9foRY_peZu4FqeGHQ/0_0
> at
> java.base/sun.nio.fs.UnixFileSystemProvider.implDelete(UnixFileSystemProvider.java:289)
> at
> java.base/sun.nio.fs.AbstractFileSystemProvider.deleteIfExists(AbstractFileSystemProvider.java:109)
> at java.base/java.nio.file.Files.deleteIfExists(Files.java:1191)
> at
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:898)
> at
> org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:870)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2803)
> at java.base/java.nio.file.Files.walkFileTree(Files.java:2857)
> at org.apache.kafka.common.utils.Utils.delete(Utils.java:870)
> at
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:266)
> at
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.purgeLocalStreamsState(IntegrationTestUtils.java:278)
> at
> org.apache.kafka.streams.integration.RestoreIntegrationTest.shouldInvokeUserDefinedGlobalStateRestoreListener(RestoreIntegrationTest.java:583)
> at java.base/java.lang.reflect.Method.invoke(Method.java:580)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)