adriangb opened a new pull request, #21786: URL: https://github.com/apache/datafusion/pull/21786
## Which issue does this PR close? - Alternative to #20416. Informs https://github.com/datafusion-contrib/datafusion-distributed/issues/180 and closes #20418. ## Rationale for this change Given a plan like ```text HashJoinExec(dynamic_filter_1 on a@0) (...left side of join) ProjectionExec(a := Column("a", source_index)) DataSourceExec ParquetSource(predicate = dynamic_filter_2) ``` after serialize/deserialize the two `DynamicFilterPhysicalExpr` wrappers should still share the same mutable `inner`, so that a `HashJoinExec` update is visible at the pushed-down `ParquetSource`. Today this breaks for two reasons: 1. `serialize_physical_expr_with_converter` calls `snapshot_physical_expr`, which replaces `DynamicFilterPhysicalExpr` with its current inner expression (often `lit(true)`) — identity is lost. 2. The existing dedup key hashes the outer `Arc::as_ptr`, but the `HashJoinExec` side and the `ParquetSource` side hold different outer `Arc`s (one comes from `with_new_children`), so they never share `expr_id`. ## What changes are included in this PR? - `PhysicalExpr::expression_id(&self) -> Option<u64>` added to the trait, defaulting to `None`. Only `DynamicFilterPhysicalExpr` reports an identity today. - `DynamicFilterPhysicalExpr` gains a stable `u64` id (random at construction). It follows the shared `Arc<RwLock<Inner>>` across `with_new_children` — two wrappers that observe the same mutable state report the same id. `id` is omitted from `Debug` to keep plan snapshots deterministic. - New `with_id_and_state(id, children, inner, generation, is_complete)` constructor used on the deserialize side to rehydrate the filter. `generation` and `is_complete` survive the proto round-trip for fidelity (matches the state that already lives on `Inner`). `state_watch` is always fresh on the receiver; cross-process update propagation remains out of scope. - New proto message `PhysicalDynamicFilterExprNode { id, current_expr, original_children, effective_children, generation, is_complete }`. The serializer downcasts `DynamicFilterPhysicalExpr` and emits this variant directly; the top-level `snapshot_physical_expr` call is removed. The `HashTableLookupExpr → lit(true)` replacement still fires (unchanged). - `DeduplicatingSerializer` is now stateless: it stamps `expr_id = expr.expression_id()`. The old `session_id`/`Arc::as_ptr`/`pid` hashing is dropped, along with the implicit within-process dedup for generic exprs. A follow-up can restore that for specific types (e.g. `InList`) by implementing `expression_id()` on them. - `DeduplicatingDeserializer` intercepts `DynamicFilter` before the generic cache lookup: the cache stores the canonical (un-remapped) wrapper keyed by `id`, and each call site's `effective_children` gets applied via `with_new_children` to produce the site-specific wrapper while sharing the canonical's `inner`. Compared to #20416: that PR introduces a generic `PhysicalExprId { exact, shallow }` and keeps the Arc-ptr default so every expression is stamped. This PR instead makes the identity hook opt-in per type (`expression_id()`), restricting the blast radius to the one type that actually needs it and letting the follow-up decide per-type whether generic dedup is worth re-introducing. ## Are these changes tested? Yes: - `dynamic_filters::test::test_expression_id_stable_across_with_new_children` — unit test verifying the id survives `with_new_children`. - `roundtrip_dynamic_filter_preserves_shared_inner` — two wrappers in a `BinaryExpr(And)` predicate share `inner` after round-trip; an `update()` on one is observable via the other. - `roundtrip_dynamic_filter_preserves_remapped_children` — two wrappers with different effective children; each preserves its site-specific projection after round-trip, both share identity, `update()` propagates, and `current()` on the remapped side applies the column substitution. - `roundtrip_dynamic_filter_in_parquet_pushdown` — the plan shape from the PR description (`FilterExec → ProjectionExec → DataSourceExec(ParquetSource with predicate)`). Asserts that the top `FilterExec`'s predicate and the pushed-down `ParquetSource` predicate end up sharing `inner` and that an `update()` at the top is observed at the scan site. Removed: the pre-existing `test_expression_deduplication_arc_sharing`, `test_deduplication_within_plan_deserialization`, `test_deduplication_within_expr_deserialization`, and two `test_session_id_rotation_*` tests — they asserted the generic Arc-ptr dedup contract that this PR deliberately drops. `cargo fmt --all` and `cargo clippy --all-targets --all-features` clean on the affected crates. Full `cargo test -p datafusion-proto -p datafusion-physical-expr` passes. ## Are there any user-facing changes? - `PhysicalExpr::expression_id` is a new trait method with a safe default (`None`), so existing implementations keep compiling. - `DeduplicatingProtoConverter` no longer deduplicates arbitrary expressions by Arc pointer; plans that relied on that happening for, say, a large shared `InList` will serialize independently until a per-type `expression_id()` is added. Dynamic filters now round-trip with shared state, which is the primary motivation. - New public `DynamicFilterPhysicalExpr` surface: `with_id`, `with_id_and_state`, `original_children`, `raw_inner_expr`, `raw_inner_state`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
