jja725 opened a new pull request, #1601:
URL: https://github.com/apache/datafusion-ballista/pull/1601

   ## Summary
   
   Implements the unchecked **"support executor failure"** task from the AQE 
epic #1359. Brings `AdaptiveExecutionGraph` to parity with 
`StaticExecutionGraph`'s executor-loss recovery so re-running stages and 
rolled-back stages work end-to-end under AQE.
   
   ## Problem
   
   `AdaptiveExecutionGraph::reset_stages_on_lost_executor` was structurally a 
copy of the static-graph version, but it was a no-op in practice for AQE 
because:
   
   1. AQE's `create_resolved_stage` initialises `stage.inputs` to an empty 
`HashMap` (intentional — partition locations live in the planner's plan tree 
under `ExchangeExec.shuffle_partitions`, not in `stage.inputs`). The 
static-graph rollback walk thus found nothing.
   2. The `AdaptivePlanner`'s side state (`runnable_stage_cache`, 
`runnable_stage_output`) was never told about lost executors, so a re-running 
successful stage failed with `Can't find active stage to update stage outputs`.
   3. `ExchangeExec` / `AdaptiveDatafusionExec` in the live plan tree retained 
`shuffle_partitions = Some(...)` even after their owning stage rolled back, so 
`find_runnable_exchanges` would skip them.
   
   ## Changes
   
   - **`aqe/execution_plan.rs`** — new `pub(crate) fn 
reset_locations_on_lost_executor(&self, executor_id) -> Option<usize>` on both 
`ExchangeExec` and `AdaptiveDatafusionExec`. Clears `shuffle_partitions` back 
to `None` if any location matches the lost executor; returns the affected 
stage_id.
   - **`aqe/planner.rs`** — new `pub(super) fn reset_on_lost_executor(&mut 
self, executor_id) -> Result<HashSet<usize>>` on `AdaptivePlanner`. Walks the 
live plan tree, calls the per-exec reset, restores `runnable_stage_cache` / 
`runnable_stage_output` for affected stages, re-runs `replan_stages()`, and 
returns the set of affected stage_ids.
   - **`aqe/mod.rs`** — `reset_stages_internal` now uses the planner's affected 
set to:
     1. Reset task_infos and transition matching `Successful` stages back to 
`Running`.
     2. Drop any `Resolved`/`Running` stages whose embedded plan reads from an 
affected stage (their `ShuffleReaderExec` entries hold stale partition 
locations). The planner regenerates them via `actionable_stages` once upstream 
reruns complete.
   - **`aqe/mod.rs`** — `update_task_status` now warns (instead of erroring) 
when a task status arrives for a stage that's no longer in `self.stages`. This 
is expected after the dependent-stage drop above.
   - **Tests** — port four executor-failure tests from the static-graph suite 
into a new `aqe/test/executor_failure.rs`:
     - `test_reset_completed_stage_executor_lost`
     - `test_reset_resolved_stage_executor_lost`
     - `test_task_update_after_reset_stage` (incl. idempotency check)
     - `test_long_delayed_failed_task_after_executor_lost`
   - **Helpers** — new `test_aqe_aggregation_plan` / `test_aqe_join_plan` 
helpers in `aqe/test/mod.rs` that build an `AdaptiveExecutionGraph` from a 
SQL-shaped plan, mirroring the static-graph builders.
   - **Doc cleanup** — drop the stale `/// - it does not cover executor 
failure` line from the `AdaptiveExecutionGraph` docstring.
   
   ## Test Plan
   
   - [x] `cargo test -p ballista-scheduler` — 76 passed, 0 failed, 1 ignored 
(pre-existing).
   - [x] `cargo clippy -p ballista-scheduler --tests` — clean.
   - [x] `cargo fmt -p ballista-scheduler -- --check` — clean.
   - [x] All four ported tests pass; the static-graph executor-failure tests 
still pass; existing AQE tests still pass.
   
   ## Out of Scope
   
   - AQE-specific re-optimization triggered *by* failure (e.g., switching join 
strategy after losing data).
   - Partial per-partition re-execution beyond what static graph already does.
   - Dynamic shuffle coalescing (separate task on the epic).
   
   Refs: #1359
   
   Design and implementation plan committed under `docs/superpowers/specs/` and 
`docs/superpowers/plans/`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to