Dandandan opened a new pull request, #21788:
URL: https://github.com/apache/datafusion/pull/21788

   ## Which issue does this PR close?
   
   - Closes #.
   
   ## Rationale for this change
   
   Each `RepartitionExec` input partition currently spawns **two** tokio tasks:
   
   1. `pull_from_input` — the worker that pulls batches from the input stream 
and routes them to per-output senders.
   2. `wait_for_task` — a shepherd that only awaits the worker's `JoinHandle` 
and forwards EOS (`None`) or the joined error to each output sender.
   
   Observing a ClickBench run under `tokio-console` shows the shepherd is pure 
overhead: on ~300 captured `SpawnedTask` rows, it's an exact 1:1 split between 
workers (polls ≈ 300–600, busy ≈ hundreds of ms) and shepherds (polls ≈ 3–5, 
busy ≈ 4–30 µs). That doubles the number of tokio tasks created per 
`RepartitionExec` and adds a cross-task waker round-trip per partition for no 
useful work.
   
   Example paired rows from the capture (same total lifetime, radically 
different workload):
   
   | task     | total | busy  | polls |
   |----------|-------|-------|-------|
   | worker   | 615ms | 391ms | 349   |
   | shepherd | 615ms | 22µs  | 4     |
   
   ## What changes are included in this PR?
   
   - Introduce `RepartitionExec::drive_input` that wraps `pull_from_input` in 
`AssertUnwindSafe(..).catch_unwind()` and, after the inner future resolves, 
forwards `None` / error / a synthesized panic error to each output sender from 
the *same* task.
   - Remove `RepartitionExec::wait_for_task`.
   - The outer spawn loop in `RepartitionExecState::consume_input_streams` now 
spawns exactly one task per input partition.
   
   Panic safety is preserved: a panic from `pull_from_input` is surfaced as 
`DataFusionError::Context("Join Error", Execution("task panicked: …"))`, 
mirroring the previous `JoinError`-wrapping path closely enough for existing 
callers.
   
   **Net effect:** tokio tasks spawned per `RepartitionExec` drops from `2 · N` 
to `N` (where `N` = input partitions), with one fewer cross-task waker hop per 
partition.
   
   ## Are these changes tested?
   
   Yes — covered by existing repartition tests in 
`datafusion/physical-plan/src/repartition/mod.rs`:
   
   - `cargo test -p datafusion-physical-plan --lib repartition::` → **41/41 
pass**, including:
     - `error_for_input_exec` (exercises panic propagation from input)
     - `repartition_with_error_in_stream` (exercises error propagation)
     - All spilling / ordering / drop-cancel tests.
   - `cargo clippy -p datafusion-physical-plan --all-targets -- -D warnings` → 
clean.
   - `cargo fmt --check` → clean.
   
   ## Are there any user-facing changes?
   
   No public API changes. The only behavior difference is the wording of panic 
errors surfaced through repartition: they now carry `"task panicked: {msg}"` 
(synthesized from the panic payload via `downcast_ref`) instead of the exact 
`JoinError` `Display`, but remain wrapped in `DataFusionError::Context("Join 
Error", …)` as before.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to