wirybeaver opened a new pull request, #1718:
URL: https://github.com/apache/datafusion-ballista/pull/1718
Adds **`SplitPartitionsRule`** — the inverse of #1684's
`CoalescePartitionsRule`. When upstream stats show one shuffle partition is far
larger than the median, the rule fans that partition out across multiple reader
tasks via round-robin assignment over its file list, instead of folding small
partitions together. Same per-stage invocation, same alignment-group leaf walk,
same carrier-slot-on-`ExchangeExec` pattern as #1684 — strict architectural
mirror.
> **Stacks on #1684.** Until that lands, this PR's diff shows the 5 coalesce
commits + 1 split commit. Once #1684 merges I'll rebase and the diff will
reduce to the single split commit.
Part of the AQE epic #1359. Motivating bug: #1643 (TPC-H Q2 SF1000, one
partition 8670× larger than the median). **v1 does NOT close #1643** — see "v1
scope" below.
## Mechanism — file-list sharding (v1)
The shuffle reader side already lists multiple `PartitionLocation`s per
output partition (one per upstream map task). Splitting just means handing
those locations to several reader tasks via round-robin assignment by file
index. No row-range reads, no protobuf changes, no executor data-path changes —
pure scheduler/adapter work.
Tradeoff: a partition backed by only one file can't be split this way. The
rule bails on that idx (factor stays at 1). Row-range reads to lift that
restriction live in a v2 task doc (linked at the bottom).
## v1 scope — and what it does NOT cover
File-list sharding produces `UnknownPartitioning(K')` output. Rows that used
to land in the same hash bucket on the M-side end up scattered across multiple
K'-side partitions (the round-robin assignment is file-keyed, not row-keyed).
That breaks any downstream operator with a hash or single-partition input
requirement:
- `HashJoinExec(Partitioned)` and `SortMergeJoinExec` — both legs must agree
on hash buckets.
- `AggregateExec(FinalPartitioned)` — assumes each downstream partition
holds a closed set of group keys.
- Any future operator whose `required_input_distribution()` returns
`HashPartitioned(_)` or `SinglePartition`.
Rather than enumerate operator types, the rule walks the stage subtree and
inspects `required_input_distribution()` directly. If any node above the leaves
demands hash or single-partition input, the rule bails the whole stage.
Strictly correct, future-proof against new DataFusion operators.
**TPC-H Q2's skew sits behind a `FinalPartitioned` aggregate**, so v1 cannot
help it. v1 helps the narrower set of stages where the consumer is
distribution-agnostic (`FilterExec`, `ProjectionExec`, `LocalLimitExec`,
single-input scans into `UnknownPartitioning` sinks). The infrastructure is the
win; v2 (row-range reads + aggregate-aware plan rewriting) is where Q2 lands.
## Surface
Opt-in via a single boolean. **`ballista.planner.split.enabled=false`** is
the default — the rule short-circuits, the plan flows through untouched. Users
who want the skew-handling turn it on.
| key | default | meaning |
|---|---|---|
| `ballista.planner.split.enabled` | `false` | master gate |
| `ballista.planner.split.skew_factor` | `5.0` | Spark `OptimizeSkewedJoin`
default — partition is "skewed" if `bytes > skew_factor × median` |
| `ballista.planner.split.min_split_bytes` | 64 MiB | absolute floor — don't
fan-out trivially small partitions |
| `ballista.planner.split.max_split_factor` | `8` | per-partition fan-out
cap to limit executor pressure |
## Algorithm
For each per-stage call (mirrors the coalesce rule line-for-line):
1. Bail if disabled.
2. Single subtree walk that does two things: bail if any operator demands
`HashPartitioned`/`SinglePartition` input, AND collect every leaf
`ExchangeExec` (`Jump` after each hit, same convention as #1684).
3. Conflict guard: bail if any leaf already has `coalesce()` or `split()`
set.
4. Alignment-group invariant: every leaf shares M (Q22 guard — same one
#1684 added in `fix(coalesce): bail on heterogeneous M`).
5. Sum byte sizes element-wise across the alignment group; capture leaf-0's
file-count vector.
6. `decide_split_factors(summed_bytes, file_counts, skew, min_bytes,
max_factor)` — three guards (single-file → factor 1; below min-bytes → factor
1; below skew ratio → factor 1) then `ceil(bytes / median)` capped at
`max_factor`, floored at 2.
7. If every factor is 1, bail.
8. Build one `SplitPlan` with `shards = factors_to_shards(&factors)` and
attach uniformly to every leaf via `set_split` — sharing the plan (not just K')
keeps per-idx fan-out identical across leaves, which matters for non-join
multi-leaf shapes (UNION).
Sharing the plan across the alignment group is the same invariant #1684
enforces for coalesce — preserves uniform K' downstream.
## Components
| Area | Change |
|---|---|
| **Rule** | `SplitPartitionsRule` in
`state/aqe/optimizer_rule/split_partitions.rs`. Unit struct. Invoked per-stage
in `AdaptivePlanner::actionable_stages()` right after `CoalescePartitionsRule`.
|
| **Carrier** | `ExchangeExec` gains an `Arc<Mutex<Option<Arc<SplitPlan>>>>`
slot next to the existing `coalesce` slot, with `set_split` / `split`
accessors. `split=K' of M` annotation appears in `DisplayAs` output only when
attached. Mutually exclusive with `coalesce` — rule's conflict guard enforces
it. |
| **Adapter** | `BallistaAdapter::transform_children` checks
`exchange.split()`; on `Some(sp)` it pre-shards the M-shape upstream
`Vec<Vec<PartitionLocation>>` into K'-shape via
`SplitShard::owns_file(file_idx)` and builds the reader via
`ShuffleReaderExec::try_new_split` with `UnknownPartitioning(K')`. |
| **Reader** | `ShuffleReaderExec::try_new_split` constructor + `split:
Option<SplitPlan>` field. Threaded through `with_work_dir`, `with_client_pool`,
`with_new_children`, `partition_statistics`, `DisplayAs`. `execute()` needs no
changes — `self.partition[idx]` already returns the per-output-partition
`Vec<PartitionLocation>`, which the adapter has pre-sharded. The reader is
oblivious to whether sharding happened. |
| **Algorithm** | Pure-CPU helpers in `state/aqe/split/algorithm.rs` —
`decide_split_factors` (median-based skew detection mirroring Spark's
`OptimizeSkewedJoin`) + `factors_to_shards` (per-idx factor → flat shard list).
|
| **Config** | Four new keys (table above). |
`SplitPlan` is NOT round-tripped through proto in v1 — the rule attaches it
on the scheduler side, the adapter consumes it inline, and the resulting
`ShuffleReaderExec` ships already-sharded `Vec<Vec<PartitionLocation>>`. A
protobuf round-trip would only be needed if the rule outcome had to survive
serialization to the executor, which it doesn't.
## Test plan
- [x] `cargo test --workspace --no-fail-fast` — workspace tests pass, 0
failures (includes the 7 coalesce regressions from #1684 plus the 8 new split
tests).
- [x] `cargo clippy --workspace --all-targets --tests` — 0 warnings.
- [x] `cargo fmt --all` — clean.
- [x] 9 unit tests in `state/aqe/split/algorithm.rs` covering the decision
function: no-skew, single-outlier, below-min-bytes, single-file guard,
max-factor cap, zero-median (div-by-zero guard), empty input, shard expansion
mixed factors, all-passthrough.
- [x] 8 functional tests in `state::aqe::test::split_rule`:
1. `bails_on_final_partitioned_aggregate` — heavy skew, GROUP BY plan,
snapshot confirms no `split=` annotation on the leaf.
2. `bails_on_partitioned_hash_join` — `HashJoinExec(Partitioned)`, both
legs.
3. `bails_on_sort_merge_join` — `SortMergeJoinExec`, both legs.
4. `skips_when_disabled` — synthetic safe shape, rule off, no slot
attached.
5. `attaches_split_when_skewed_partition_has_multiple_files` — happy path,
M=4, idx 3 with 4 files and 4096× the median → leaf's `SplitPlan` holds K' = 3
passthroughs + 8 split shards = 11 (max-factor cap).
6. `skips_when_single_file_in_skewed_idx` — v1 single-file guard.
7. `idempotent_on_second_pass` — two consecutive `optimize` calls produce
the same `Arc`, slot not overwritten.
8. `default_off_returns_input_arc_verbatim` — `Arc::ptr_eq(input, output)`
when disabled.
- [ ] **TPC-H SF=100 sanity sweep, 22 queries × 2 join variants,
`split.enabled=true`**: deferred to PR validation env. Expected outcome is
mostly non-regression — most queries hit a hash or `FinalPartitioned` consumer
and the rule bails. If any query *does* fire the rule (a stage ending in
`Filter` / `Projection` / `LocalLimit` over a hash exchange), I'll record the K
→ K' increase in a comment.
## v2 follow-up
The honest scope limitation in v1 (file-list sharding requires ≥2 files per
partition; bails on hash/single consumers) is lifted by v2 — row-range reads
(`PartitionLocation::row_range: Option<(u64, u64)>`, batch-count IPC reader on
the executor) AND aggregate-aware splitting (rewrite
`AggregateExec(FinalPartitioned)` into per-shard partial + reshuffle + final).
That's the path #1643 will actually land on. Task doc written and cross-linked.
🤖 Generated with [Claude Code](https://claude.com/claude-code)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]