stevenzwu commented on code in PR #10393:
URL: https://github.com/apache/iceberg/pull/10393#discussion_r1619139980


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java:
##########
@@ -48,10 +52,17 @@ public class 
TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg
 
   @Override
   protected IcebergSource.Builder<RowData> sourceBuilder() {
-    return IcebergSource.<RowData>builder()
+    Configuration config = new Configuration();
+    
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 
128);
+    return IcebergSource.forRowData()
         .tableLoader(sourceTableResource.tableLoader())
         .watermarkColumn("ts")
-        .project(TestFixtures.TS_SCHEMA);
+        .project(TestFixtures.TS_SCHEMA)
+        // Prevent combining splits
+        .set(

Review Comment:
   ah. we missed the bundling earlier.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java:
##########
@@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable 
afterFailAction, MiniCluster min
   private static class RecordCounterToFail {
 
     private static AtomicInteger records;
-    private static CompletableFuture<Void> fail;
+    private static CountDownLatch countDownLatch;
     private static CompletableFuture<Void> continueProcessing;
 
     private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> 
stream, int failAfter) {
 
       records = new AtomicInteger();
-      fail = new CompletableFuture<>();
       continueProcessing = new CompletableFuture<>();
+      countDownLatch = new CountDownLatch(stream.getParallelism());

Review Comment:
   curious how is the countdown latch used by the map operator. will it be 
serialized and a new cloned object is deserialized on the operator/subtask 
side? if yes, I guess there is no difference with completable future. If no, 
this ensures all subtasks are running and processed some records before failure 
is triggered.
   
   could this be verified by break point in IDE?



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java:
##########
@@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable 
afterFailAction, MiniCluster min
   private static class RecordCounterToFail {
 
     private static AtomicInteger records;
-    private static CompletableFuture<Void> fail;
+    private static CountDownLatch countDownLatch;
     private static CompletableFuture<Void> continueProcessing;
 
     private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> 
stream, int failAfter) {
 
       records = new AtomicInteger();
-      fail = new CompletableFuture<>();
       continueProcessing = new CompletableFuture<>();
+      countDownLatch = new CountDownLatch(stream.getParallelism());
       return stream.map(
           record -> {
             boolean reachedFailPoint = records.incrementAndGet() > failAfter;
-            boolean notFailedYet = !fail.isDone();
+            boolean notFailedYet = countDownLatch.getCount() != 0;
             if (notFailedYet && reachedFailPoint) {
-              fail.complete(null);
+              countDownLatch.countDown();
               continueProcessing.get();
             }
             return record;
           });
     }
 
-    private static void waitToFail() throws ExecutionException, 
InterruptedException {
-      fail.get();
+    private static void waitToFail() throws InterruptedException {

Review Comment:
   I also just realized that this method is probably not accurate anymore with 
the `testBoundedWithSavepoint`. Maybe it should be called `waitForCondition`? 
also the inner class name `RecordCounterToFail` could be renamed to 
`RecordCounterToWait`.



##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java:
##########
@@ -48,10 +52,17 @@ public class 
TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg
 
   @Override
   protected IcebergSource.Builder<RowData> sourceBuilder() {
-    return IcebergSource.<RowData>builder()
+    Configuration config = new Configuration();
+    
config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 
128);

Review Comment:
   is this config necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to