amogh-jahagirdar commented on code in PR #14824:
URL: https://github.com/apache/iceberg/pull/14824#discussion_r2616509679
##########
core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java:
##########
@@ -137,16 +131,55 @@ public void run() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ failure.compareAndSet(null, new RuntimeException("PlanWorker was
interrupted", e));
+ shutdown.set(true);
} catch (Exception e) {
- throw new RuntimeException("Worker failed processing planTask", e);
+ failure.compareAndSet(null, new RuntimeException("Worker failed
processing planTask", e));
+ shutdown.set(true);
} finally {
- int remaining = activeWorkers.decrementAndGet();
+ handleWorkerExit();
+ }
+ }
+
+ private void handleWorkerExit() {
+ int remainingActiveWorkers = activeWorkers.decrementAndGet();
+ boolean noWorkLeft =
+ remainingActiveWorkers == 0 && planTasks.isEmpty() &&
initialFileScanTasks.isEmpty();
+
+ // Only the last worker should signal completion to avoid multiple
DUMMY_TASKs
+ if (noWorkLeft || (shutdown.get() && remainingActiveWorkers == 0)) {
+ signalCompletion();
+ } else if (remainingActiveWorkers == 0 && hasRemainingWork()) {
+ // This state should never be reached since this indicates that all
workers failed and
+ // there's still in-flight work. Workers which failed should've set
the failure state and
+ // the consumer of the iterator would've already seen this state.
+ shutdown.set(true);
+ failure.compareAndSet(
+ null,
+ new IllegalStateException("Workers have exited but there is still
work to be done"));
+ }
+ }
+
+ private boolean hasRemainingWork() {
+ return !planTasks.isEmpty() || !initialFileScanTasks.isEmpty();
+ }
+
+ private void signalCompletion() {
+ try {
+ taskQueue.put(DUMMY_TASK);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ shutdown.set(true);
+ failure.compareAndSet(
+ null, new RuntimeException("Interrupted while signaling
completion", e));
Review Comment:
Same as what I mentioned somewhere else, are we sure we want the consumer to
throw this when the planning behind the scenes is interrupted?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]