amogh-jahagirdar commented on code in PR #14824:
URL: https://github.com/apache/iceberg/pull/14824#discussion_r2616509679


##########
core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java:
##########
@@ -137,16 +131,55 @@ public void run() {
 
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
+        failure.compareAndSet(null, new RuntimeException("PlanWorker was 
interrupted", e));
+        shutdown.set(true);
       } catch (Exception e) {
-        throw new RuntimeException("Worker failed processing planTask", e);
+        failure.compareAndSet(null, new RuntimeException("Worker failed 
processing planTask", e));
+        shutdown.set(true);
       } finally {
-        int remaining = activeWorkers.decrementAndGet();
+        handleWorkerExit();
+      }
+    }
+
+    private void handleWorkerExit() {
+      int remainingActiveWorkers = activeWorkers.decrementAndGet();
+      boolean noWorkLeft =
+          remainingActiveWorkers == 0 && planTasks.isEmpty() && 
initialFileScanTasks.isEmpty();
+
+      // Only the last worker should signal completion to avoid multiple 
DUMMY_TASKs
+      if (noWorkLeft || (shutdown.get() && remainingActiveWorkers == 0)) {
+        signalCompletion();
+      } else if (remainingActiveWorkers == 0 && hasRemainingWork()) {
+        // This state should never be reached since this indicates that all 
workers failed and
+        // there's still in-flight work.  Workers which failed should've set 
the failure state and
+        // the consumer of the iterator would've already seen this state.
+        shutdown.set(true);
+        failure.compareAndSet(
+            null,
+            new IllegalStateException("Workers have exited but there is still 
work to be done"));
+      }
+    }
+
+    private boolean hasRemainingWork() {
+      return !planTasks.isEmpty() || !initialFileScanTasks.isEmpty();
+    }
+
+    private void signalCompletion() {
+      try {
+        taskQueue.put(DUMMY_TASK);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        shutdown.set(true);
+        failure.compareAndSet(
+            null, new RuntimeException("Interrupted while signaling 
completion", e));

Review Comment:
   Same as what I mentioned somewhere else, are we sure we want the consumer to 
throw this when the planning behind the scenes is interrupted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to