nk1506 commented on code in PR #8725:
URL: https://github.com/apache/iceberg/pull/8725#discussion_r1348266266


##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -148,9 +150,10 @@ public void testConsumeFromStartSnapshotId() throws 
Exception {
       TestSourceContext sourceContext = new TestSourceContext(latch);
       runSourceFunctionInTask(sourceContext, function);
 
-      Assert.assertTrue(
-          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS));
-      Thread.sleep(1000L);
+      Awaitility.await()
+        .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)
+        .pollInterval(100, TimeUnit.MILLISECONDS)
+        .until(() -> latch.getCount() == 0);

Review Comment:
   same as above. 



##########
flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/TestStreamingMonitorFunction.java:
##########
@@ -111,9 +112,10 @@ public void testConsumeWithoutStartSnapshotId() throws 
Exception {
       TestSourceContext sourceContext = new TestSourceContext(latch);
       runSourceFunctionInTask(sourceContext, function);
 
-      Assert.assertTrue(
-          "Should have expected elements.", latch.await(WAIT_TIME_MILLIS, 
TimeUnit.MILLISECONDS));
-      Thread.sleep(1000L);
+      Awaitility.await()
+        .atMost(WAIT_TIME_MILLIS, TimeUnit.MILLISECONDS)
+        .pollInterval(100, TimeUnit.MILLISECONDS)
+        .until(() -> latch.getCount() == 0);

Review Comment:
   I don't think this is a good idea.  There are high changes to become flaky 
tests. 
   At bad day, your latch count became 0 and threads are still doing remaining 
processing. but your worker thread will start compiling next lines. 
   I think let `latch.await` be there . Just replace Thread.sleep with 
`Awaitility` . 



##########
flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java:
##########
@@ -325,7 +326,7 @@ public void testSpecificSnapshotTimestamp() throws 
Exception {
     long snapshot0Timestamp = 
tableResource.table().currentSnapshot().timestampMillis();
 
     // sleep for 2 ms to make sure snapshot1 has a higher timestamp value
-    Thread.sleep(2);
+    Awaitility.await().pollDelay(2, TimeUnit.MILLISECONDS).until(() -> 
System.currentTimeMillis()-snapshot0Timestamp>2);

Review Comment:
   IMHO, let's not use `Awaitility` for just system clock delay. We can enhance 
[TestHelpers](https://github.com/apache/iceberg/blob/dd02085b12e9b509efef2d9a8b9fe730c0c60b03/api/src/test/java/org/apache/iceberg/TestHelpers.java#L57)
  like
   `public static long waitUntil1ms() {
   long previous = System.currentTimeMillis();
       long current = System.currentTimeMillis();
       while (current <= previous) {
         current = System.currentTimeMillis();
       }
       return current;
     }`
   for any other `waitUntilAfter` we can use  `Awaitility` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to