nk1506 commented on code in PR #8725: URL: https://github.com/apache/iceberg/pull/8725#discussion_r1348266266
########## flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java: ########## @@ -148,9 +150,10 @@ public void testConsumeFromStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); Review Comment: same as above. ########## flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java: ########## @@ -111,9 +112,10 @@ public void testConsumeWithoutStartSnapshotId() throws Exception { TestSourceContext sourceContext = new TestSourceContext(latch); runSourceFunctionInTask(sourceContext, function); - Assert.assertTrue( - "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)); - Thread.sleep(1000L); + Awaitility.await() + .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS) + .pollInterval(100, TimeUnit.MILLISECONDS) + .until(() -> latch.getCount() == 0); Review Comment: I don't think this is a good idea. There are high changes to become flaky tests. At bad day, your latch count became 0 and threads are still doing remaining processing. but your worker thread will start compiling next lines. I think let `latch.await` be there . Just replace Thread.sleep with `Awaitility` . ########## flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java: ########## @@ -325,7 +326,7 @@ public void testSpecificSnapshotTimestamp() throws Exception { long snapshot0Timestamp = tableResource.table().currentSnapshot().timestampMillis(); // sleep for 2 ms to make sure snapshot1 has a higher timestamp value - Thread.sleep(2); + Awaitility.await().pollDelay(2, TimeUnit.MILLISECONDS).until(() -> System.currentTimeMillis()-snapshot0Timestamp>2); Review Comment: IMHO, let's not use `Awaitility` for just system clock delay. We can enhance [TestHelpers](https://github.com/apache/iceberg/blob/dd02085b12e9b509efef2d9a8b9fe730c0c60b03/api/src/test/java/org/apache/iceberg/TestHelpers.java#L57) like `public static long waitUntil1ms() { long previous = System.currentTimeMillis(); long current = System.currentTimeMillis(); while (current <= previous) { current = System.currentTimeMillis(); } return current; }` for any other `waitUntilAfter` we can use `Awaitility` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org