mbutrovich opened a new pull request, #4215: URL: https://github.com/apache/datafusion-comet/pull/4215
## Which issue does this PR close? Closes #4022. ## Rationale for this change Under AQE, Spark's `PlanAdaptiveDynamicPruningFilters` rewrites `SubqueryAdaptiveBroadcastExec` (SAB) to `SubqueryBroadcastExec` so DPP filters reuse the join's already-materialized broadcast. For Iceberg native scans this rewrite was a no-op: `CometIcebergNativeScanExec` kept `runtimeFilters` inside its `@transient originalPlan`, where neither Spark's expression walks nor our own `transformExpressionsUp` passes could see them. The SAB stayed unconverted and the dim table executed a second time as a standalone broadcast. #4112 fixed the equivalent problem for V1 native Parquet by lifting `runtimeFilters` to a top-level constructor field and using Spark's standard `prepare` / `waitForSubqueries` flow. This PR applies the same design to V2 Iceberg, replacing the earlier prototype in #4033, and aligns `CometNativeScanExec` and `CometIcebergNativeScanExec` so both scans go through the same DPP and subquery resolution path. ## What changes are included in this PR? - Lifted `runtimeFilters` to a top-level constructor field on `CometIcebergNativeScanExec` so Spark's `productIterator`-based expression walks (and our `transformExpressionsUp` passes) see and rewrite it directly. Mirrors `BatchScanExec` and matches the V1 design from #4112. - Added `CometLeafExec.ensureSubqueriesResolved()`, bridging Comet's custom `findAllPlanData` data-collection path with Spark's standard `prepare` -> `waitForSubqueries` flow. Removes the deadlock-prone reflection hack from #4033 and eliminates ad-hoc double-checked locking. - Refactored `CometNativeScanExec` to use the same flow (dropped its redundant `doPrepare` override and outer DPP filter loop) so V1 and V2 stay in sync. - New Iceberg branch in `CometPlanAdaptiveDynamicPruningFilters` (3.5+) that converts the SAB inside `runtimeFilters` to `CometSubqueryBroadcastExec` (or `SubqueryBroadcastExec` if the join fell back to vanilla Spark). Matches by `buildKeys` exprIds to disambiguate multiple broadcast joins, and rewrites to `Literal.TrueLiteral` when no matching broadcast join exists (e.g., SMJ) so DPP is disabled but results stay correct. On 3.4 Iceberg falls back without reuse: `CometSpark34AqeDppFallbackRule` walks scan `partitionFilters`, which `BatchScanExec` doesn't have. - `LazyIcebergMetric` defers metric value resolution. `SparkPlanInfo.fromSparkPlan` reads the metrics map for SQL UI events at planning time, before AQE's queryStageOptimizerRules run; without deferral that read would trigger `serializedPartitionData` against an unconverted SAB. - `serializedPartitionData` rebuilds `originalPlan` with the current top-level `runtimeFilters` before serializing. Otherwise Spark's `PlanAdaptiveDynamicPruningFilters` rewrite is invisible to the `@transient originalPlan` and `serializePartitions` re-translates the unresolved `InSubqueryExec`. - Handle the `ParallelCollectionRDD` shape that `BatchScanExec.inputRDD` returns when DPP prunes all partitions (matched by class name since it is `private[spark]`). ## How are these changes tested? - 14 new AQE DPP tests in `CometIcebergNativeSuite` covering broadcast reuse, multiple DPP filters sharing a broadcast, buildKeys-based disambiguation across joins, BHJ fallback to vanilla Spark, SMJ (no broadcast) graceful disable, empty broadcast pruning all partitions, cross-stage scalar subqueries, and the V2 SPJ shape variations across Spark versions. - Verified on Spark 3.4, 3.5, 4.0, 4.1 for `CometIcebergNativeSuite`, `CometExecSuite`, `CometDppFallbackRepro3949Suite`, and `CometShuffleFallbackStickinessSuite`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
