pvary commented on code in PR #10393: URL: https://github.com/apache/iceberg/pull/10393#discussion_r1619909229
########## flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java: ########## @@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable afterFailAction, MiniCluster min private static class RecordCounterToFail { private static AtomicInteger records; - private static CompletableFuture<Void> fail; + private static CountDownLatch countDownLatch; private static CompletableFuture<Void> continueProcessing; private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> stream, int failAfter) { records = new AtomicInteger(); - fail = new CompletableFuture<>(); continueProcessing = new CompletableFuture<>(); + countDownLatch = new CountDownLatch(stream.getParallelism()); return stream.map( record -> { boolean reachedFailPoint = records.incrementAndGet() > failAfter; - boolean notFailedYet = !fail.isDone(); + boolean notFailedYet = countDownLatch.getCount() != 0; if (notFailedYet && reachedFailPoint) { - fail.complete(null); + countDownLatch.countDown(); continueProcessing.get(); } return record; }); } - private static void waitToFail() throws ExecutionException, InterruptedException { - fail.get(); + private static void waitToFail() throws InterruptedException { Review Comment: The issue was discovered when there was a job restart, because of a temporary issue in a downstream service. This is a failure recovery scenario, in many ways very similar to the Failover cases tested here. Maybe we can rename the class to `TestIcebergSourceFailureRecovery`? That said, I see the following issues mixing refactoring and actual fixes in the same PR: - Makes it hard in retrospect to understand what was the fix, and what was the refactor. If we need to revisit the fix, because it's not working, or we have a similar issue, then we will have hard time separating those. - If you have a stable branch for internal releases, you might not want to accept the risks inherent to the refactor, but you might need the fix. Mixing fix and refactor code makes this hard. - If you refactor the code that you are fixing, and move it to another class, then it is very hard to spot small mistakes or changes done in the same part of the code. Makes the reviewer life hard, and the process error prone. - Blocking flaky test fix on refactoring could waste serious developer hours, if other developers bumps into the same issue and sink time into trying to find and fix the bug. I am all for cleaning up code, and happy to do it whenever I have time, but I think that if it's not related to the actual fix/PR, then it should be done only with a very limited scope. If we spot something which needs to be fixed, we should create a specific, refactoring PR for it. I would be happy to review it, whenever you make one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org