notfilippo opened a new pull request, #21829:
URL: https://github.com/apache/datafusion/pull/21829

   ## Summary
   
   Closes #13525 (RFC: extensible logical planning pipeline).
   
   Replaces DataFusion's four hardcoded planning slots (`Analyzer`, 
`Optimizer`, `AsyncAnalyzer`, `AsyncOptimizer`) with a single, ordered, 
extensible `LogicalPlanningPipeline` — a `Vec<Phase>` where each phase is 
named, activatable, carries a strategy, and holds rules of one type.
   
   Design inspired by **Spark SQL's** `Batch(name, strategy, rules*)` model and 
**Apache Calcite's** `HepPlanner` batches.
   
   ### New types (`datafusion-optimizer`)
   
   | Type | Description |
   |---|---|
   | `Strategy` | `Once` or `FixedPoint { max_passes: Option<usize> }` |
   | `SyncPhase<T>` | Named, enabled/disabled phase with sync rules |
   | `AsyncPhase<T>` | Same, but rules are async |
   | `Phase` | Enum over `SyncAnalysis`, `SyncOptimization`, `AsyncAnalysis`, 
`AsyncOptimization` |
   | `LogicalPlanningPipeline` | Ordered `Vec<Phase>` with `apply_sync()`, 
`apply_sync_explained()`, `apply()` |
   
   Type aliases: `SyncAnalysisPhase`, `SyncOptimizationPhase`, 
`AsyncAnalysisPhase`, `AsyncOptimizationPhase`.
   
   Two new traits:
   - `AsyncAnalyzerRule` — async counterpart to `AnalyzerRule`
   - `AsyncOptimizerRule` — async counterpart to `OptimizerRule`
   
   ### Default pipeline
   
   Two phases that replicate existing behavior exactly:
   
   ```
   "analysis"      SyncAnalysis    FixedPoint  (Analyzer::new().rules)
   "optimization"  SyncOptimization FixedPoint  (Optimizer::new().rules)
   ```
   
   Named constants: `DEFAULT_ANALYSIS_PHASE`, `DEFAULT_OPTIMIZATION_PHASE`.
   
   ### Correctness preserved
   
   - `SyncOptimizationPhase::apply()` uses `LogicalPlanSignature` for early 
FixedPoint convergence.
   - `SyncAnalysisPhase::apply()` runs `check_invariants(Always)` before and 
`check_invariants(Executable)` after.
   - `SyncOptimizationPhase::apply()` runs `check_invariants(Executable)` 
before, `assert_expected_schema` per-rule, and a final schema assertion.
   - `function_rewrites` are threaded into the analysis phase via 
`ApplyFunctionRewrites` (matching original `Analyzer::execute_and_check` 
behavior).
   - `apply_sync_explained()` captures `AnalyzedLogicalPlan`, 
`FinalAnalyzedLogicalPlan`, and `OptimizedLogicalPlan` `StringifiedPlan`s — 
EXPLAIN VERBOSE per-rule output is preserved.
   
   ### SessionState changes
   
   ```rust
   // Before
   analyzer: Analyzer,
   optimizer: Optimizer,
   async_analyzer: AsyncAnalyzer,
   async_optimizer: AsyncOptimizer,
   
   // After
   logical_pipeline: LogicalPlanningPipeline,
   function_rewrites: Vec<Arc<dyn FunctionRewrite + Send + Sync>>,
   ```
   
   `optimize()` handles `Explain` nodes by calling `apply_sync_explained()` and 
collecting per-rule `StringifiedPlan`s. `optimize_with_config()` runs only the 
optimization phase (used by PREPARE). `create_physical_plan()` calls 
`pipeline.apply().await`.
   
   `analyzer()` and `optimizer()` are kept for backward compatibility but now 
return owned values reconstructed from the pipeline (minor breaking change — 
previously returned references).
   
   ### SessionStateBuilder API
   
   ```rust
   // Replace whole pipeline
   .with_logical_pipeline(LogicalPlanningPipeline)
   
   // Insert custom phases
   .with_phase(phase)                     // push to end
   .with_phase_before("analysis", phase)  // insert before named phase
   .with_phase_after("analysis", phase)   // insert after named phase
   
   // Backward-compat (mutate default phase rules in place)
   .with_analyzer_rules(rules)
   .with_optimizer_rules(rules)
   ```
   
   Async phases are registered explicitly by constructing a named 
`AsyncAnalysisPhase` or `AsyncOptimizationPhase` and inserting it with 
`with_phase_before` / `with_phase_after`. No hidden auto-inserted phases.
   
   ### Extension example (async pre-analysis phase)
   
   ```rust
   let mut phase = AsyncAnalysisPhase::new("my-async-analysis", Strategy::Once);
   phase.rules.push(Arc::new(MyAsyncRule));
   
   let state = SessionStateBuilder::new()
       .with_phase_before(DEFAULT_ANALYSIS_PHASE, Phase::AsyncAnalysis(phase))
       .build();
   ```
   
   ### Known gaps and open questions
   
   - `analyzer()` / `optimizer()` now return owned values (previously 
`&Analyzer` / `&Optimizer`). A caching layer could restore reference semantics 
if needed.
   - Async phases do not contribute to EXPLAIN VERBOSE output (only sync phases 
are covered by `apply_sync_explained`).
   - `SessionStateBuilder::analyzer()` / `::optimizer()` mutable accessor 
methods are removed (replaced by `with_logical_pipeline` / `with_phase_*`).
   
   ## Test plan
   
   - [x] `cargo build -p datafusion-optimizer -p datafusion`
   - [x] `cargo clippy -p datafusion-optimizer -p datafusion --all-features -- 
-D warnings`
   - [x] `cargo test -p datafusion-optimizer`
   - [x] `cargo test -p datafusion --lib -- execution::session_state`
   - [ ] Integration test: async phase counter increment via 
`with_phase_before` + `collect()`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to