SubhamSinghal commented on code in PR #21479: URL: https://github.com/apache/datafusion/pull/21479#discussion_r3055566331
########## datafusion/physical-optimizer/src/window_topn.rs: ########## @@ -0,0 +1,330 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`WindowTopN`] optimizer rule for per-partition top-K window queries. +//! +//! Detects queries of the form: +//! +//! ```sql +//! SELECT * FROM ( +//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn +//! FROM t +//! ) WHERE rn <= K; +//! ``` +//! +//! And replaces the `FilterExec → BoundedWindowAggExec → SortExec` pipeline +//! with `BoundedWindowAggExec → PartitionedTopKExec(fetch=K)`, removing both +//! the `FilterExec` and `SortExec`. +//! +//! See [`PartitionedTopKExec`] +//! for details on the replacement operator. + +use std::sync::Arc; + +use crate::PhysicalOptimizerRule; +use arrow::datatypes::DataType; +use datafusion_common::config::ConfigOptions; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr::Operator; +use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal}; +use datafusion_physical_expr::window::StandardWindowExpr; +use datafusion_physical_plan::ExecutionPlan; +use datafusion_physical_plan::filter::FilterExec; +use datafusion_physical_plan::projection::ProjectionExec; +use datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec; +use datafusion_physical_plan::sorts::sort::SortExec; +use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr}; + +/// Physical optimizer rule that converts per-partition `ROW_NUMBER` top-K +/// queries into a more efficient plan using [`PartitionedTopKExec`]. +/// +/// # Pattern Detected +/// +/// ```text +/// FilterExec(rn <= K) +/// [optional ProjectionExec] +/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) +/// SortExec(partition_keys, order_keys) +/// ``` +/// +/// # Replacement +/// +/// ```text +/// [optional ProjectionExec] +/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...) +/// PartitionedTopKExec(partition_keys, order_keys, fetch=K) +/// ``` +/// +/// The `FilterExec` is removed entirely (all output rows have `rn ∈ {1..K}`). +/// The `SortExec` is replaced by `PartitionedTopKExec` which maintains a +/// per-partition top-K heap instead of sorting the entire dataset. +/// +/// # Supported Predicates +/// +/// - `rn <= K` → fetch = K +/// - `rn < K` → fetch = K - 1 +/// - `K >= rn` (flipped) → fetch = K +/// - `K > rn` (flipped) → fetch = K - 1 +/// +/// # When the Rule Does NOT Fire +/// +/// - Window function is not `ROW_NUMBER` (e.g., `RANK`, `DENSE_RANK`) +/// - No `PARTITION BY` clause (global top-K is already handled by +/// `SortExec` with `fetch`) +/// - Filter predicate is on a data column, not the window output column +/// - `FilterExec` has an embedded projection +/// - Child of `BoundedWindowAggExec` is not a `SortExec` +/// - Config flag `enable_window_topn` is `false` +/// +/// [`PartitionedTopKExec`]: datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec +#[derive(Default, Clone, Debug)] +pub struct WindowTopN; + +impl WindowTopN { + pub fn new() -> Self { + Self + } + + /// Attempt to transform a single plan node. + /// + /// Returns `Some(new_plan)` if the node matches the + /// `FilterExec → [ProjectionExec] → BoundedWindowAggExec → SortExec` + /// pattern and can be rewritten, or `None` if the node should be + /// left unchanged. + fn try_transform(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> { + // Step 1: Match FilterExec at the top + let filter = plan.downcast_ref::<FilterExec>()?; + + // Don't handle filters with projections Review Comment: The filter's column indices would point to the projected schema, not the window exec's output schema, so our index-based matching for the ROW_NUMBER column would be wrong without resolving the projection mapping. Skipping this case for simplicity right now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
