timsaucer opened a new pull request, #1544:
URL: https://github.com/apache/datafusion-python/pull/1544

   # Which issue does this PR close?
   
   No associated issue. This is **PR 1 of 4** splitting up the work originally 
on `feat/expr-pickle` (~2.8k LOC) into reviewable chunks. The four PRs stack 
sequentially on top of this one; subsequent PRs target this branch's tip until 
it merges.
   
   # Rationale for this change
   
   Today a `LogicalPlan` or `Expr` referencing a Python-defined `ScalarUDF` 
cannot survive a serialization round-trip without the receiver pre-registering 
a matching UDF, because the upstream protobuf codecs only carry the UDF *name*. 
That blocks shipping expressions to worker processes via `pickle.dumps` / 
`multiprocessing.Pool` / Ray actors / `datafusion-distributed`. This PR closes 
the scalar-UDF case end-to-end so the natural `pickle.dumps(expr)` pattern 
works for built-ins and Python scalar UDFs with no receiver-side setup.
   
   # What changes are included in this PR?
   
   Adds Python-aware encoding to `PythonLogicalCodec` / `PythonPhysicalCodec` 
so a `ScalarUDF` whose impl is `PythonFunctionScalarUDF` travels inside the 
serialized expression: the codec cloudpickles `(name, func, input_schema, 
return_field, volatility)` into `fun_definition`. Non-Python UDFs delegate to 
`inner` unchanged.
   
   Wire format is `<DFPYUDF magic, version byte, cloudpickle blob>`. The 
version byte lets a too-new/too-old payload surface a clean `Execution` error 
instead of an opaque cloudpickle unpack failure. Schema serde uses arrow-rs's 
native IPC stream writer (no pyarrow round-trip), and the `cloudpickle` module 
handle is cached per-interpreter through `PyOnceLock` so plans with many UDFs 
don't pay the import cost per UDF.
   
   On the Python side, `Expr` gains `__reduce__` plus a `classmethod 
from_bytes(buf, ctx=None)`. A new `datafusion.ipc` module exposes 
`set_worker_ctx` / `get_worker_ctx` / `clear_worker_ctx` thread-locals; 
`_resolve_ctx` consults explicit-ctx > worker-ctx > global `SessionContext`. 
FFI UDFs still travel by name and need the matching registration on the 
receiver's context. `ScalarUDF.name` is exposed as a property so `pickle` 
callers can introspect after a round-trip.
   
   `PythonFunctionScalarUDF` is bumped to `pub(crate)` and gets `func()` / 
`return_field()` accessors and a `from_parts` constructor for the codec. The 
`PartialEq` impl picks up a pointer-identity fast path (the common case is 
`Arc`-shared clones of the same UDF) and logs Python `__eq__` exceptions at 
`debug` instead of silently treating them as `false`. The `Hash` impl hashes 
the identifying header only — the prior `unhashable -> 0` fallback collapsed 
all unhashable closures to one bucket, which is the pathological case for a 
hashmap; `PartialEq` still disambiguates.
   
   `cloudpickle>=2.0` is added as a runtime dependency (lazy-imported on the 
encode/decode hot path).
   
   Aggregate and window inline encoding, the per-session 
`with_python_udf_inlining` toggle, sender-side context wiring, and the 
user-guide docs land in PRs 2-4 of this series.
   
   # Are there any user-facing changes?
   
   Yes:
   
   - `Expr` is now picklable. Built-ins and Python scalar UDFs round-trip with 
no worker-side setup.
   - New `Expr.to_bytes(ctx=None)` / `Expr.from_bytes(buf, ctx=None)` shape. 
`from_bytes` is now a `classmethod` with `ctx` as a keyword-only 
`None`-default. **Breaking** for any direct `Expr.from_bytes(ctx, blob)` 
callers — the in-tree call sites are updated.
   - New public module `datafusion.ipc` with `set_worker_ctx` / 
`get_worker_ctx` / `clear_worker_ctx`.
   - New `ScalarUDF.name` property.
   - New runtime dependency on `cloudpickle>=2.0`.
   
   Adding `api change` because of the `Expr.from_bytes` signature flip.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to