andygrove opened a new pull request, #1592:
URL: https://github.com/apache/datafusion-ballista/pull/1592
# Which issue does this PR close?
Closes #1591.
# Rationale for this change
The Python `BallistaSessionContext` did not forward DataFusion / Ballista
session configuration to the scheduler-side session. In particular, there
was no way to set `datafusion.execution.target_partitions` from Python,
so a Python client running on a small pod against a much larger cluster
ended up using the client's local CPU count for the partition count,
underutilizing the cluster.
The mechanism for shipping the client config is already in place — every
`DistributedQueryExec::execute()` call serializes the entire client
`SessionConfig` into the gRPC `ExecuteQueryParams.settings` field and the
scheduler applies it via `update_from_key_value_pair`. The Python entry
point just wasn't taking advantage of it: `create_ballista_data_frame`
built the remote session from
`SessionStateBuilder::new_with_default_features()`, which defaults
`target_partitions` to `num_cpus::get()` of the client pod. That stock
value was then shipped to the scheduler, overriding its defaults
(`SessionConfig::new_with_ballista()` sets `target_partitions = 16`).
Setting the value via SQL `SET ...` from the Python client also did
not work, because that statement runs against the local DataFusion
session and `_to_internal_df()` constructs a fresh remote session for
each query.
# What changes are included in this PR?
- `python/src/lib.rs` — `create_ballista_data_frame` now starts the
remote session from `SessionConfig::new_with_ballista()` (so it picks
up Ballista-friendly defaults) and accepts an optional dict of
configuration overrides applied via
`SessionConfigHelperExt::update_from_key_value_pair_mut`.
- `python/python/ballista/extension.py` — `BallistaSessionContext`
accepts an optional `cluster_config: dict[str, str]`, stores it, and
threads it through every `DistributedDataFrame` so `_to_internal_df()`
can forward it to the Rust entry point.
- A test asserting the dict is propagated end-to-end through the
wrapper.
# Are there any user-facing changes?
Yes — `BallistaSessionContext` gains a new optional `cluster_config`
keyword argument:
```python
ctx = BallistaSessionContext(
"df://localhost:50050",
cluster_config={"datafusion.execution.target_partitions": "256"},
)
```
The argument is optional and existing call sites are unaffected.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]