adriangb opened a new pull request, #21786:
URL: https://github.com/apache/datafusion/pull/21786

   ## Which issue does this PR close?
   
   - Alternative to #20416. Informs 
https://github.com/datafusion-contrib/datafusion-distributed/issues/180 and 
closes #20418.
   
   ## Rationale for this change
   
   Given a plan like
   
   ```text
   HashJoinExec(dynamic_filter_1 on a@0)
     (...left side of join)
     ProjectionExec(a := Column("a", source_index))
       DataSourceExec
         ParquetSource(predicate = dynamic_filter_2)
   ```
   
   after serialize/deserialize the two `DynamicFilterPhysicalExpr` wrappers 
should still share the same mutable `inner`, so that a `HashJoinExec` update is 
visible at the pushed-down `ParquetSource`. Today this breaks for two reasons:
   
   1. `serialize_physical_expr_with_converter` calls `snapshot_physical_expr`, 
which replaces `DynamicFilterPhysicalExpr` with its current inner expression 
(often `lit(true)`) — identity is lost.
   2. The existing dedup key hashes the outer `Arc::as_ptr`, but the 
`HashJoinExec` side and the `ParquetSource` side hold different outer `Arc`s 
(one comes from `with_new_children`), so they never share `expr_id`.
   
   ## What changes are included in this PR?
   
   - `PhysicalExpr::expression_id(&self) -> Option<u64>` added to the trait, 
defaulting to `None`. Only `DynamicFilterPhysicalExpr` reports an identity 
today.
   - `DynamicFilterPhysicalExpr` gains a stable `u64` id (random at 
construction). It follows the shared `Arc<RwLock<Inner>>` across 
`with_new_children` — two wrappers that observe the same mutable state report 
the same id. `id` is omitted from `Debug` to keep plan snapshots deterministic.
   - New `with_id_and_state(id, children, inner, generation, is_complete)` 
constructor used on the deserialize side to rehydrate the filter. `generation` 
and `is_complete` survive the proto round-trip for fidelity (matches the state 
that already lives on `Inner`). `state_watch` is always fresh on the receiver; 
cross-process update propagation remains out of scope.
   - New proto message `PhysicalDynamicFilterExprNode { id, current_expr, 
original_children, effective_children, generation, is_complete }`. The 
serializer downcasts `DynamicFilterPhysicalExpr` and emits this variant 
directly; the top-level `snapshot_physical_expr` call is removed. The 
`HashTableLookupExpr → lit(true)` replacement still fires (unchanged).
   - `DeduplicatingSerializer` is now stateless: it stamps `expr_id = 
expr.expression_id()`. The old `session_id`/`Arc::as_ptr`/`pid` hashing is 
dropped, along with the implicit within-process dedup for generic exprs. A 
follow-up can restore that for specific types (e.g. `InList`) by implementing 
`expression_id()` on them.
   - `DeduplicatingDeserializer` intercepts `DynamicFilter` before the generic 
cache lookup: the cache stores the canonical (un-remapped) wrapper keyed by 
`id`, and each call site's `effective_children` gets applied via 
`with_new_children` to produce the site-specific wrapper while sharing the 
canonical's `inner`.
   
   Compared to #20416: that PR introduces a generic `PhysicalExprId { exact, 
shallow }` and keeps the Arc-ptr default so every expression is stamped. This 
PR instead makes the identity hook opt-in per type (`expression_id()`), 
restricting the blast radius to the one type that actually needs it and letting 
the follow-up decide per-type whether generic dedup is worth re-introducing.
   
   ## Are these changes tested?
   
   Yes:
   
   - 
`dynamic_filters::test::test_expression_id_stable_across_with_new_children` — 
unit test verifying the id survives `with_new_children`.
   - `roundtrip_dynamic_filter_preserves_shared_inner` — two wrappers in a 
`BinaryExpr(And)` predicate share `inner` after round-trip; an `update()` on 
one is observable via the other.
   - `roundtrip_dynamic_filter_preserves_remapped_children` — two wrappers with 
different effective children; each preserves its site-specific projection after 
round-trip, both share identity, `update()` propagates, and `current()` on the 
remapped side applies the column substitution.
   - `roundtrip_dynamic_filter_in_parquet_pushdown` — the plan shape from the 
PR description (`FilterExec → ProjectionExec → DataSourceExec(ParquetSource 
with predicate)`). Asserts that the top `FilterExec`'s predicate and the 
pushed-down `ParquetSource` predicate end up sharing `inner` and that an 
`update()` at the top is observed at the scan site.
   
   Removed: the pre-existing `test_expression_deduplication_arc_sharing`, 
`test_deduplication_within_plan_deserialization`, 
`test_deduplication_within_expr_deserialization`, and two 
`test_session_id_rotation_*` tests — they asserted the generic Arc-ptr dedup 
contract that this PR deliberately drops.
   
   `cargo fmt --all` and `cargo clippy --all-targets --all-features` clean on 
the affected crates. Full `cargo test -p datafusion-proto -p 
datafusion-physical-expr` passes.
   
   ## Are there any user-facing changes?
   
   - `PhysicalExpr::expression_id` is a new trait method with a safe default 
(`None`), so existing implementations keep compiling.
   - `DeduplicatingProtoConverter` no longer deduplicates arbitrary expressions 
by Arc pointer; plans that relied on that happening for, say, a large shared 
`InList` will serialize independently until a per-type `expression_id()` is 
added. Dynamic filters now round-trip with shared state, which is the primary 
motivation.
   - New public `DynamicFilterPhysicalExpr` surface: `with_id`, 
`with_id_and_state`, `original_children`, `raw_inner_expr`, `raw_inner_state`.
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to