jja725 opened a new pull request, #1601:
URL: https://github.com/apache/datafusion-ballista/pull/1601
## Summary
Implements the unchecked **"support executor failure"** task from the AQE
epic #1359. Brings `AdaptiveExecutionGraph` to parity with
`StaticExecutionGraph`'s executor-loss recovery so re-running stages and
rolled-back stages work end-to-end under AQE.
## Problem
`AdaptiveExecutionGraph::reset_stages_on_lost_executor` was structurally a
copy of the static-graph version, but it was a no-op in practice for AQE
because:
1. AQE's `create_resolved_stage` initialises `stage.inputs` to an empty
`HashMap` (intentional — partition locations live in the planner's plan tree
under `ExchangeExec.shuffle_partitions`, not in `stage.inputs`). The
static-graph rollback walk thus found nothing.
2. The `AdaptivePlanner`'s side state (`runnable_stage_cache`,
`runnable_stage_output`) was never told about lost executors, so a re-running
successful stage failed with `Can't find active stage to update stage outputs`.
3. `ExchangeExec` / `AdaptiveDatafusionExec` in the live plan tree retained
`shuffle_partitions = Some(...)` even after their owning stage rolled back, so
`find_runnable_exchanges` would skip them.
## Changes
- **`aqe/execution_plan.rs`** — new `pub(crate) fn
reset_locations_on_lost_executor(&self, executor_id) -> Option<usize>` on both
`ExchangeExec` and `AdaptiveDatafusionExec`. Clears `shuffle_partitions` back
to `None` if any location matches the lost executor; returns the affected
stage_id.
- **`aqe/planner.rs`** — new `pub(super) fn reset_on_lost_executor(&mut
self, executor_id) -> Result<HashSet<usize>>` on `AdaptivePlanner`. Walks the
live plan tree, calls the per-exec reset, restores `runnable_stage_cache` /
`runnable_stage_output` for affected stages, re-runs `replan_stages()`, and
returns the set of affected stage_ids.
- **`aqe/mod.rs`** — `reset_stages_internal` now uses the planner's affected
set to:
1. Reset task_infos and transition matching `Successful` stages back to
`Running`.
2. Drop any `Resolved`/`Running` stages whose embedded plan reads from an
affected stage (their `ShuffleReaderExec` entries hold stale partition
locations). The planner regenerates them via `actionable_stages` once upstream
reruns complete.
- **`aqe/mod.rs`** — `update_task_status` now warns (instead of erroring)
when a task status arrives for a stage that's no longer in `self.stages`. This
is expected after the dependent-stage drop above.
- **Tests** — port four executor-failure tests from the static-graph suite
into a new `aqe/test/executor_failure.rs`:
- `test_reset_completed_stage_executor_lost`
- `test_reset_resolved_stage_executor_lost`
- `test_task_update_after_reset_stage` (incl. idempotency check)
- `test_long_delayed_failed_task_after_executor_lost`
- **Helpers** — new `test_aqe_aggregation_plan` / `test_aqe_join_plan`
helpers in `aqe/test/mod.rs` that build an `AdaptiveExecutionGraph` from a
SQL-shaped plan, mirroring the static-graph builders.
- **Doc cleanup** — drop the stale `/// - it does not cover executor
failure` line from the `AdaptiveExecutionGraph` docstring.
## Test Plan
- [x] `cargo test -p ballista-scheduler` — 76 passed, 0 failed, 1 ignored
(pre-existing).
- [x] `cargo clippy -p ballista-scheduler --tests` — clean.
- [x] `cargo fmt -p ballista-scheduler -- --check` — clean.
- [x] All four ported tests pass; the static-graph executor-failure tests
still pass; existing AQE tests still pass.
## Out of Scope
- AQE-specific re-optimization triggered *by* failure (e.g., switching join
strategy after losing data).
- Partial per-partition re-execution beyond what static graph already does.
- Dynamic shuffle coalescing (separate task on the epic).
Refs: #1359
Design and implementation plan committed under `docs/superpowers/specs/` and
`docs/superpowers/plans/`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]