amogh-jahagirdar commented on code in PR #14824:
URL: https://github.com/apache/iceberg/pull/14824#discussion_r2616484049
##########
core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java:
##########
@@ -137,16 +131,50 @@ public void run() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ failure.compareAndSet(null, new RuntimeException("PlanWorker was
interrupted", e));
} catch (Exception e) {
- throw new RuntimeException("Worker failed processing planTask", e);
+ failure.compareAndSet(null, new RuntimeException("Worker failed
processing planTask", e));
} finally {
- int remaining = activeWorkers.decrementAndGet();
+ handleWorkerExit();
+ }
+ }
+
+ private void handleWorkerExit() {
+ int remainingActiveWorkers = activeWorkers.decrementAndGet();
+ boolean noWorkLeft =
+ remainingActiveWorkers == 0 && planTasks.isEmpty() &&
initialFileScanTasks.isEmpty();
+
+ if (noWorkLeft || shutdown.get()) {
+ signalCompletion();
+ } else if (remainingActiveWorkers == 0 && hasRemainingWork()) {
+ // if there is still work to be done but no active workers, it means
all workers have
+ // failed. no need to respawn workers since the iterator will fail on
next()
+ // and throw back the failure.
+ failure.compareAndSet(
+ null,
+ new IllegalStateException("Workers have exited but there is still
work to be done"));
Review Comment:
Actually, is there any state where a worker 1 should continue doing work if
worker 2 fails? We don't really care about fault tolerance here. I think we can
simplify a lot of this even further...
As soon as a worker fails just indicate something to the other threads to
stop doing work. I'd do this via shutdown. It's maybe slightly suboptimal for a
limit case where maybe one worker could've satisfied a limit even though the
other one failed but I think that's a narrow case.
##########
core/src/main/java/org/apache/iceberg/rest/ScanTaskIterable.java:
##########
@@ -137,16 +131,50 @@ public void run() {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ failure.compareAndSet(null, new RuntimeException("PlanWorker was
interrupted", e));
} catch (Exception e) {
- throw new RuntimeException("Worker failed processing planTask", e);
+ failure.compareAndSet(null, new RuntimeException("Worker failed
processing planTask", e));
} finally {
- int remaining = activeWorkers.decrementAndGet();
+ handleWorkerExit();
+ }
+ }
+
+ private void handleWorkerExit() {
+ int remainingActiveWorkers = activeWorkers.decrementAndGet();
+ boolean noWorkLeft =
+ remainingActiveWorkers == 0 && planTasks.isEmpty() &&
initialFileScanTasks.isEmpty();
+
+ if (noWorkLeft || shutdown.get()) {
+ signalCompletion();
+ } else if (remainingActiveWorkers == 0 && hasRemainingWork()) {
+ // if there is still work to be done but no active workers, it means
all workers have
+ // failed. no need to respawn workers since the iterator will fail on
next()
+ // and throw back the failure.
+ failure.compareAndSet(
+ null,
+ new IllegalStateException("Workers have exited but there is still
work to be done"));
Review Comment:
I think having this sanity check is fine but I'd reword the comment to
reflect that it's an unexpected state:
```
// This state should never be reached since this indicates that all workers
failed and there's still in-flight work. Workers which failed should've set
the failure state and the consumer of the iterator would've already seen this
state.
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]