pvary commented on code in PR #10393:
URL: https://github.com/apache/iceberg/pull/10393#discussion_r1619909229


##########
flink/v1.19/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceFailover.java:
##########
@@ -333,28 +336,28 @@ private static void restartTaskManager(Runnable 
afterFailAction, MiniCluster min
   private static class RecordCounterToFail {
 
     private static AtomicInteger records;
-    private static CompletableFuture<Void> fail;
+    private static CountDownLatch countDownLatch;
     private static CompletableFuture<Void> continueProcessing;
 
     private static <T> DataStream<T> wrapWithFailureAfter(DataStream<T> 
stream, int failAfter) {
 
       records = new AtomicInteger();
-      fail = new CompletableFuture<>();
       continueProcessing = new CompletableFuture<>();
+      countDownLatch = new CountDownLatch(stream.getParallelism());
       return stream.map(
           record -> {
             boolean reachedFailPoint = records.incrementAndGet() > failAfter;
-            boolean notFailedYet = !fail.isDone();
+            boolean notFailedYet = countDownLatch.getCount() != 0;
             if (notFailedYet && reachedFailPoint) {
-              fail.complete(null);
+              countDownLatch.countDown();
               continueProcessing.get();
             }
             return record;
           });
     }
 
-    private static void waitToFail() throws ExecutionException, 
InterruptedException {
-      fail.get();
+    private static void waitToFail() throws InterruptedException {

Review Comment:
   The issue was discovered when there was a job restart, because of a 
temporary issue in a downstream service.
   This is a failure recovery scenario, in many ways very similar to the 
Failover cases tested here. Maybe we can rename the class to 
`TestIcebergSourceFailureRecovery`?
   
   That said, I see the following issues mixing refactoring and actual fixes in 
the same PR:
   - Makes it hard in retrospect to understand what was the fix, and what was 
the refactor. If we need to revisit the fix, because it's not working, or we 
have a similar issue, then we will have hard time separating those. 
   - If you have a stable branch for internal releases, you might not want to 
accept the risks inherent to the refactor, but you might need the fix. Mixing 
fix and refactor code makes this hard.
   - If you refactor the code that you are fixing, and move it to another 
class, then it is very hard to spot small mistakes or changes done in the same 
part of the code. Makes the reviewer life hard, and the process error prone. 
   - Blocking flaky test fix on refactoring could waste serious developer 
hours, if other developers bumps into the same issue and sink time into trying 
to find and fix the bug.
   
   I am all for cleaning up code, and happy to do it whenever I have time, but 
I think that if it's not related to the actual fix/PR, then it should be done 
only with a very limited scope. If we spot something which needs to be fixed, 
we should create a specific, refactoring PR for it. I would be happy to review 
it, whenever you make one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to