stevenzwu commented on code in PR #10393: URL: https://github.com/apache/iceberg/pull/10393#discussion_r1619139980
########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java: ########## @@ -48,10 +52,17 @@ public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg @Override protected IcebergSource.Builder<RowData> sourceBuilder() { - return IcebergSource.<RowData>builder() + Configuration config = new Configuration(); + config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128); + return IcebergSource.forRowData() .tableLoader(sourceTableResource.tableLoader()) .watermarkColumn("ts") - .project(TestFixtures.TS_SCHEMA); + .project(TestFixtures.TS_SCHEMA) + // Prevent combining splits + .set( Review Comment: ah. we missed the bundling earlier. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java: ########## @@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable afterFailAction, MiniCluster min private static class RecordCounterToFail { private static AtomicInteger records; - private static CompletableFuture<Void> fail; + private static CountDownLatch countDownLatch; private static CompletableFuture<Void> continueProcessing; private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) { records = new AtomicInteger(); - fail = new CompletableFuture<>(); continueProcessing = new CompletableFuture<>(); + countDownLatch = new CountDownLatch(stream.getParallelism()); Review Comment: curious how is the countdown latch used by the map operator. will it be serialized and a new cloned object is deserialized on the operator/subtask side? if yes, I guess there is no difference with completable future. If no, this ensures all subtasks are running and processed some records before failure is triggered. could this be verified by break point in IDE? ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java: ########## @@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable afterFailAction, MiniCluster min private static class RecordCounterToFail { private static AtomicInteger records; - private static CompletableFuture<Void> fail; + private static CountDownLatch countDownLatch; private static CompletableFuture<Void> continueProcessing; private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) { records = new AtomicInteger(); - fail = new CompletableFuture<>(); continueProcessing = new CompletableFuture<>(); + countDownLatch = new CountDownLatch(stream.getParallelism()); return stream.map( record -> { boolean reachedFailPoint = records.incrementAndGet() > failAfter; - boolean notFailedYet = !fail.isDone(); + boolean notFailedYet = countDownLatch.getCount() != 0; if (notFailedYet && reachedFailPoint) { - fail.complete(null); + countDownLatch.countDown(); continueProcessing.get(); } return record; }); } - private static void waitToFail() throws ExecutionException, InterruptedException { - fail.get(); + private static void waitToFail() throws InterruptedException { Review Comment: I also just realized that this method is probably not accurate anymore with the `testBoundedWithSavepoint`. Maybe it should be called `waitForCondition`? also the inner class name `RecordCounterToFail` could be renamed to `RecordCounterToWait`. ########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailoverWithWatermarkExtractor.java: ########## @@ -48,10 +52,17 @@ public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg @Override protected IcebergSource.Builder<RowData> sourceBuilder() { - return IcebergSource.<RowData>builder() + Configuration config = new Configuration(); + config.setInteger(FlinkConfigOptions.SOURCE_READER_FETCH_BATCH_RECORD_COUNT, 128); Review Comment: is this config necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org