2010YOUY01 commented on code in PR #21479:
URL: https://github.com/apache/datafusion/pull/21479#discussion_r3055436567
##########
datafusion/common/src/config.rs:
##########
@@ -1087,6 +1087,12 @@ config_namespace! {
/// past window functions, if possible
pub enable_window_limits: bool, default = true
+ /// When set to true, the optimizer will replace
+ /// Filter(rn<=K) ā Window(ROW_NUMBER) ā Sort patterns with a
+ /// PartitionedTopKExec that maintains per-partition heaps, avoiding
+ /// a full sort of the input.
+ pub enable_window_topn: bool, default = true
Review Comment:
I suggest to default it to `false`, for large partition counts, the
regression seems significant.
As a follow-up, we could detect the input cardinality and automatically
choose the right plan.
##########
datafusion/core/examples/h2o_window_topn_bench.rs:
##########
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Standalone H2O groupby Q8 benchmark: PartitionedTopKExec enabled vs disabled
Review Comment:
We could keep this benchmark in this PR, but it would be great to clean it
up later.
To make benchmark maintenance easier, we could directly add queries
representing this workload to h2o window benchmark, so that similar benchmarks
won't get scattered to multiple places.
https://github.com/apache/datafusion/blob/e1ad8713fec0f0c704dae1f290c6006273ad11be/benchmarks/bench.sh#L123
Though the issue is now the h2o benchmark counts the dataset loading time,
so we can't isolate the target executor's processing time, so we could add an
option to eliminate the data loading time later š¤
##########
datafusion/physical-plan/src/sorts/partitioned_topk.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`PartitionedTopKExec`]: Top-K per partition operator
+//!
+//! For queries like:
+//! ```sql
+//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn
+//! FROM t WHERE rn <= N
+//! ```
+//!
+//! Instead of sorting the entire dataset, this operator maintains a
+//! [`TopK`] heap per partition (reusing the existing TopK implementation)
+//! and emits only the top-K rows per partition in sorted order
+//! `(partition_keys, order_keys)`.
+
+use std::fmt::{self, Formatter};
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, UInt32Array};
+use arrow::compute::take_record_batch;
+use arrow::datatypes::SchemaRef;
+use arrow::row::{OwnedRow, RowConverter};
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{HashMap, Result};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use parking_lot::RwLock;
+
+use crate::execution_plan::{Boundedness, EmissionType};
+use crate::metrics::ExecutionPlanMetricsSet;
+use crate::topk::{TopK, TopKDynamicFilters, build_sort_fields};
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
stream::RecordBatchStreamAdapter,
+};
+
+/// Per-partition Top-K operator for window function queries.
+///
+/// # Background
+///
+/// "Top K per partition" is a common analytics pattern used for queries such
as
+/// "find the top 3 products by revenue for each store". The (simplified) SQL
+/// for such a query might be:
+///
+/// ```sql
+/// SELECT * FROM (
+/// SELECT *, ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC)
as rn
+/// FROM sales
+/// ) WHERE rn <= 3;
+/// ```
+///
+/// The unoptimized physical plan would be:
+///
+/// ```text
+/// FilterExec: rn <= 3
+/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY
[revenue DESC]
+/// SortExec: expr=[store ASC, revenue DESC]
+/// DataSourceExec
+/// ```
+///
+/// This plan sorts the **entire** dataset (O(N log N)), computes `ROW_NUMBER`
+/// for **all** rows, and then filters to keep only the top K per partition.
+/// With 10M rows, 1K partitions, and K=3, it sorts all 10M rows but only
+/// keeps 3K.
+///
+/// # Optimization
+///
+/// `PartitionedTopKExec` replaces the `SortExec` and the `FilterExec` is
+/// removed. The optimized plan becomes:
+///
+/// ```text
+/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY [revenue
DESC]
+/// PartitionedTopKExec: fetch=3, partition=[store], order=[revenue DESC]
+/// DataSourceExec
+/// ```
+///
+/// Instead of sorting the entire dataset, this operator reads unsorted input,
+/// maintains a [`TopK`] heap per distinct partition key, and emits only the
+/// top-K rows per partition in sorted order `(partition_keys, order_keys)`.
+///
+/// Cost: O(N log K) time instead of O(N log N), and O(K Ć P Ć row_size)
+/// memory where K = fetch, P = number of distinct partitions.
+///
+/// # Example
+///
+/// For the query above with `fetch=3` and input:
+///
+/// ```text
+/// store | revenue
+/// ------|--------
+/// A | 100
+/// B | 50
+/// A | 200
+/// B | 150
+/// A | 300
+/// A | 400
+/// ```
+///
+/// The operator maintains two heaps:
+/// - **store=A**: keeps top-3 by revenue DESC ā {400, 300, 200}, evicts 100
+/// - **store=B**: keeps top-3 by revenue DESC ā {150, 50} (only 2 rows)
+///
+/// Output (sorted by store ASC, revenue DESC):
+///
+/// ```text
+/// store | revenue
+/// ------|--------
+/// A | 400
+/// A | 300
+/// A | 200
+/// B | 150
+/// B | 50
+/// ```
+///
+/// This is then passed to `BoundedWindowAggExec` which assigns
+/// `ROW_NUMBER` 1, 2, 3 to each partition ā all of which satisfy `rn <= 3`.
+///
+/// # Limitations
+///
+/// - Only activated when the window function is `ROW_NUMBER` with a
+/// `PARTITION BY` clause. Global top-K (no `PARTITION BY`) is already
+/// handled efficiently by `SortExec` with `fetch`.
+/// - Memory usage is proportional to `K Ć P`. For very high cardinality
+/// partition keys (millions of distinct values), this may use significant
+/// memory.
+#[derive(Debug, Clone)]
+pub struct PartitionedTopKExec {
+ /// Input execution plan (reads unsorted data)
+ input: Arc<dyn ExecutionPlan>,
+ /// Full sort expressions: `[partition_keys..., order_keys...]`.
+ ///
+ /// For `PARTITION BY store ORDER BY revenue DESC` with sort
+ /// `[store ASC, revenue DESC]`, the first `partition_prefix_len`
+ /// expressions are the partition keys (`[store ASC]`) and the
+ /// remaining are the order-by keys (`[revenue DESC]`).
+ expr: LexOrdering,
+ /// Number of leading expressions in `expr` that define the partition
+ /// key. For example, `PARTITION BY a, b` ā `partition_prefix_len = 2`.
+ partition_prefix_len: usize,
+ /// Maximum number of rows to keep per partition (the K in "top-K").
+ /// Derived from the filter predicate: `rn <= 3` ā `fetch = 3`,
+ /// `rn < 3` ā `fetch = 2`.
+ fetch: usize,
+ /// Execution metrics
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Cached plan properties (output ordering, partitioning, etc.)
+ cache: Arc<PlanProperties>,
+}
+
+impl PartitionedTopKExec {
+ /// Create a new `PartitionedTopKExec`.
+ ///
+ /// # Arguments
+ ///
+ /// * `input` - The child execution plan providing unsorted input rows.
+ /// * `expr` - Full sort ordering `[partition_keys..., order_keys...]`.
+ /// For `PARTITION BY pk ORDER BY val ASC`, this would be `[pk ASC, val
ASC]`.
+ /// * `partition_prefix_len` - Number of leading expressions in `expr`
+ /// that form the partition key. Must be >= 1.
+ /// * `fetch` - Maximum rows to retain per partition (the K in "top-K").
+ ///
+ /// # Example
+ ///
+ /// ```text
+ /// // For: ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC)
... WHERE rn <= 5
+ /// PartitionedTopKExec::try_new(
+ /// data_source,
+ /// LexOrdering([store ASC, revenue DESC]),
+ /// 1, // partition_prefix_len: 1 partition column (store)
+ /// 5, // fetch: keep top 5 per partition
+ /// )
+ /// ```
+ pub fn try_new(
+ input: Arc<dyn ExecutionPlan>,
+ expr: LexOrdering,
+ partition_prefix_len: usize,
+ fetch: usize,
+ ) -> Result<Self> {
+ let cache = Self::compute_properties(&input, expr.clone())?;
+ Ok(Self {
+ input,
+ expr,
+ partition_prefix_len,
+ fetch,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ cache: Arc::new(cache),
+ })
+ }
+
+ /// Returns the child execution plan.
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Returns the full sort ordering `[partition_keys..., order_keys...]`.
+ pub fn expr(&self) -> &LexOrdering {
+ &self.expr
+ }
+
+ /// Returns the number of leading expressions in [`Self::expr`] that
+ /// define the partition key.
+ pub fn partition_prefix_len(&self) -> usize {
+ self.partition_prefix_len
+ }
+
+ /// Returns the maximum number of rows retained per partition.
+ pub fn fetch(&self) -> usize {
+ self.fetch
+ }
+
+ /// Compute [`PlanProperties`] for this operator.
+ ///
+ /// The output is sorted by `sort_exprs` (partition keys then order keys),
+ /// uses the same partitioning as the input, emits all output at once
+ /// (`EmissionType::Final`), and is bounded.
+ fn compute_properties(
+ input: &Arc<dyn ExecutionPlan>,
+ sort_exprs: LexOrdering,
+ ) -> Result<PlanProperties> {
+ let mut eq_properties = input.equivalence_properties().clone();
+ eq_properties.reorder(sort_exprs)?;
+
+ Ok(PlanProperties::new(
+ eq_properties,
+ input.output_partitioning().clone(),
+ EmissionType::Final,
+ Boundedness::Bounded,
+ ))
+ }
+}
+
+impl DisplayAs for PartitionedTopKExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let partition_exprs: Vec<String> =
self.expr[..self.partition_prefix_len]
+ .iter()
+ .map(|e| format!("{}", e.expr))
+ .collect();
+ let order_exprs: Vec<String> =
self.expr[self.partition_prefix_len..]
+ .iter()
+ .map(|e| format!("{e}"))
+ .collect();
+ write!(
+ f,
+ "PartitionedTopKExec: fetch={}, partition=[{}],
order=[{}]",
+ self.fetch,
+ partition_exprs.join(", "),
+ order_exprs.join(", "),
+ )
+ }
+ DisplayFormatType::TreeRender => {
+ writeln!(f, "fetch={}", self.fetch)?;
+ writeln!(f, "{}", self.expr)
Review Comment:
Tree format should also display partition/order expr, and we could also add
simple tests for it in sqllogictests like
```
set datafusion.explain.format = tree;
explain ...
```
##########
datafusion/physical-plan/src/sorts/partitioned_topk.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`PartitionedTopKExec`]: Top-K per partition operator
+//!
+//! For queries like:
+//! ```sql
+//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn
+//! FROM t WHERE rn <= N
+//! ```
+//!
+//! Instead of sorting the entire dataset, this operator maintains a
+//! [`TopK`] heap per partition (reusing the existing TopK implementation)
+//! and emits only the top-K rows per partition in sorted order
+//! `(partition_keys, order_keys)`.
+
+use std::fmt::{self, Formatter};
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, UInt32Array};
+use arrow::compute::take_record_batch;
+use arrow::datatypes::SchemaRef;
+use arrow::row::{OwnedRow, RowConverter};
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{HashMap, Result};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use parking_lot::RwLock;
+
+use crate::execution_plan::{Boundedness, EmissionType};
+use crate::metrics::ExecutionPlanMetricsSet;
+use crate::topk::{TopK, TopKDynamicFilters, build_sort_fields};
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
stream::RecordBatchStreamAdapter,
+};
+
+/// Per-partition Top-K operator for window function queries.
+///
+/// # Background
+///
+/// "Top K per partition" is a common analytics pattern used for queries such
as
+/// "find the top 3 products by revenue for each store". The (simplified) SQL
+/// for such a query might be:
+///
+/// ```sql
+/// SELECT * FROM (
+/// SELECT *, ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC)
as rn
+/// FROM sales
+/// ) WHERE rn <= 3;
+/// ```
+///
+/// The unoptimized physical plan would be:
+///
+/// ```text
+/// FilterExec: rn <= 3
+/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY
[revenue DESC]
+/// SortExec: expr=[store ASC, revenue DESC]
+/// DataSourceExec
+/// ```
+///
+/// This plan sorts the **entire** dataset (O(N log N)), computes `ROW_NUMBER`
+/// for **all** rows, and then filters to keep only the top K per partition.
+/// With 10M rows, 1K partitions, and K=3, it sorts all 10M rows but only
+/// keeps 3K.
+///
+/// # Optimization
+///
+/// `PartitionedTopKExec` replaces the `SortExec` and the `FilterExec` is
+/// removed. The optimized plan becomes:
+///
+/// ```text
+/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY [revenue
DESC]
+/// PartitionedTopKExec: fetch=3, partition=[store], order=[revenue DESC]
+/// DataSourceExec
+/// ```
+///
+/// Instead of sorting the entire dataset, this operator reads unsorted input,
+/// maintains a [`TopK`] heap per distinct partition key, and emits only the
+/// top-K rows per partition in sorted order `(partition_keys, order_keys)`.
+///
+/// Cost: O(N log K) time instead of O(N log N), and O(K Ć P Ć row_size)
+/// memory where K = fetch, P = number of distinct partitions.
+///
+/// # Example
+///
+/// For the query above with `fetch=3` and input:
+///
+/// ```text
+/// store | revenue
+/// ------|--------
+/// A | 100
+/// B | 50
+/// A | 200
+/// B | 150
+/// A | 300
+/// A | 400
+/// ```
+///
+/// The operator maintains two heaps:
+/// - **store=A**: keeps top-3 by revenue DESC ā {400, 300, 200}, evicts 100
+/// - **store=B**: keeps top-3 by revenue DESC ā {150, 50} (only 2 rows)
+///
+/// Output (sorted by store ASC, revenue DESC):
+///
+/// ```text
+/// store | revenue
+/// ------|--------
+/// A | 400
+/// A | 300
+/// A | 200
+/// B | 150
+/// B | 50
+/// ```
+///
+/// This is then passed to `BoundedWindowAggExec` which assigns
+/// `ROW_NUMBER` 1, 2, 3 to each partition ā all of which satisfy `rn <= 3`.
+///
+/// # Limitations
+///
+/// - Only activated when the window function is `ROW_NUMBER` with a
+/// `PARTITION BY` clause. Global top-K (no `PARTITION BY`) is already
+/// handled efficiently by `SortExec` with `fetch`.
+/// - Memory usage is proportional to `K Ć P`. For very high cardinality
+/// partition keys (millions of distinct values), this may use significant
+/// memory.
+#[derive(Debug, Clone)]
+pub struct PartitionedTopKExec {
+ /// Input execution plan (reads unsorted data)
+ input: Arc<dyn ExecutionPlan>,
+ /// Full sort expressions: `[partition_keys..., order_keys...]`.
+ ///
+ /// For `PARTITION BY store ORDER BY revenue DESC` with sort
+ /// `[store ASC, revenue DESC]`, the first `partition_prefix_len`
+ /// expressions are the partition keys (`[store ASC]`) and the
+ /// remaining are the order-by keys (`[revenue DESC]`).
+ expr: LexOrdering,
+ /// Number of leading expressions in `expr` that define the partition
+ /// key. For example, `PARTITION BY a, b` ā `partition_prefix_len = 2`.
+ partition_prefix_len: usize,
+ /// Maximum number of rows to keep per partition (the K in "top-K").
+ /// Derived from the filter predicate: `rn <= 3` ā `fetch = 3`,
+ /// `rn < 3` ā `fetch = 2`.
+ fetch: usize,
+ /// Execution metrics
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Cached plan properties (output ordering, partitioning, etc.)
+ cache: Arc<PlanProperties>,
+}
+
+impl PartitionedTopKExec {
+ /// Create a new `PartitionedTopKExec`.
+ ///
+ /// # Arguments
+ ///
+ /// * `input` - The child execution plan providing unsorted input rows.
+ /// * `expr` - Full sort ordering `[partition_keys..., order_keys...]`.
+ /// For `PARTITION BY pk ORDER BY val ASC`, this would be `[pk ASC, val
ASC]`.
+ /// * `partition_prefix_len` - Number of leading expressions in `expr`
+ /// that form the partition key. Must be >= 1.
+ /// * `fetch` - Maximum rows to retain per partition (the K in "top-K").
+ ///
+ /// # Example
+ ///
+ /// ```text
+ /// // For: ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC)
... WHERE rn <= 5
+ /// PartitionedTopKExec::try_new(
+ /// data_source,
+ /// LexOrdering([store ASC, revenue DESC]),
+ /// 1, // partition_prefix_len: 1 partition column (store)
+ /// 5, // fetch: keep top 5 per partition
+ /// )
+ /// ```
+ pub fn try_new(
+ input: Arc<dyn ExecutionPlan>,
+ expr: LexOrdering,
+ partition_prefix_len: usize,
+ fetch: usize,
+ ) -> Result<Self> {
+ let cache = Self::compute_properties(&input, expr.clone())?;
+ Ok(Self {
+ input,
+ expr,
+ partition_prefix_len,
+ fetch,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ cache: Arc::new(cache),
+ })
+ }
+
+ /// Returns the child execution plan.
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Returns the full sort ordering `[partition_keys..., order_keys...]`.
+ pub fn expr(&self) -> &LexOrdering {
+ &self.expr
+ }
+
+ /// Returns the number of leading expressions in [`Self::expr`] that
+ /// define the partition key.
+ pub fn partition_prefix_len(&self) -> usize {
+ self.partition_prefix_len
+ }
+
+ /// Returns the maximum number of rows retained per partition.
+ pub fn fetch(&self) -> usize {
+ self.fetch
+ }
+
+ /// Compute [`PlanProperties`] for this operator.
+ ///
+ /// The output is sorted by `sort_exprs` (partition keys then order keys),
+ /// uses the same partitioning as the input, emits all output at once
+ /// (`EmissionType::Final`), and is bounded.
+ fn compute_properties(
+ input: &Arc<dyn ExecutionPlan>,
+ sort_exprs: LexOrdering,
+ ) -> Result<PlanProperties> {
+ let mut eq_properties = input.equivalence_properties().clone();
+ eq_properties.reorder(sort_exprs)?;
+
+ Ok(PlanProperties::new(
+ eq_properties,
+ input.output_partitioning().clone(),
+ EmissionType::Final,
+ Boundedness::Bounded,
+ ))
+ }
+}
+
+impl DisplayAs for PartitionedTopKExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let partition_exprs: Vec<String> =
self.expr[..self.partition_prefix_len]
+ .iter()
+ .map(|e| format!("{}", e.expr))
+ .collect();
+ let order_exprs: Vec<String> =
self.expr[self.partition_prefix_len..]
+ .iter()
+ .map(|e| format!("{e}"))
+ .collect();
+ write!(
+ f,
+ "PartitionedTopKExec: fetch={}, partition=[{}],
order=[{}]",
+ self.fetch,
+ partition_exprs.join(", "),
+ order_exprs.join(", "),
+ )
+ }
+ DisplayFormatType::TreeRender => {
+ writeln!(f, "fetch={}", self.fetch)?;
+ writeln!(f, "{}", self.expr)
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartitionedTopKExec {
+ fn name(&self) -> &'static str {
+ "PartitionedTopKExec"
+ }
+
+ fn properties(&self) -> &Arc<PlanProperties> {
+ &self.cache
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::UnspecifiedDistribution]
Review Comment:
I think this should be requiring a Hash partition scheme for the window
partition key, the optimizer would use this API for sanity check during
optimization.
##########
datafusion/sqllogictest/test_files/window_topn.slt:
##########
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+
+# http://www.apache.org/licenses/LICENSE-2.0
+
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Tests for Window TopN optimization: PartitionedTopKExec
+
+statement ok
+CREATE TABLE window_topn_t (id INT, pk INT, val INT) AS VALUES
Review Comment:
I suggest moving the main test coverage here, instead of keeping it in unit
tests across different layers such as optimizer tests. Once we have solid
coverage here, it is less likely to get lost during local refactors.
We can also extend the coverage with more edge cases, for example:
- predicates such as `rn < 2`, `2 > rn`, etc.
- mixing other window expressions with `row_number()`
- empty or overlapping partition / order keys, such as `... OVER (ORDER BY
id)` or `... OVER (PARTITION BY id ORDER BY id, customer)`
- different sort options such as `ASC`, `DESC`, and `NULLS FIRST`
- the `QUALIFY` clause
https://datafusion.apache.org/user-guide/sql/select.html#qualify-clause
- and more
##########
datafusion/physical-optimizer/src/window_topn.rs:
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`WindowTopN`] optimizer rule for per-partition top-K window queries.
+//!
+//! Detects queries of the form:
+//!
+//! ```sql
+//! SELECT * FROM (
+//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn
+//! FROM t
+//! ) WHERE rn <= K;
+//! ```
+//!
+//! And replaces the `FilterExec ā BoundedWindowAggExec ā SortExec` pipeline
+//! with `BoundedWindowAggExec ā PartitionedTopKExec(fetch=K)`, removing both
+//! the `FilterExec` and `SortExec`.
+//!
+//! See [`PartitionedTopKExec`]
+//! for details on the replacement operator.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+use arrow::datatypes::DataType;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::Operator;
+use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
+use datafusion_physical_expr::window::StandardWindowExpr;
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr};
+
+/// Physical optimizer rule that converts per-partition `ROW_NUMBER` top-K
+/// queries into a more efficient plan using [`PartitionedTopKExec`].
+///
+/// # Pattern Detected
+///
+/// ```text
+/// FilterExec(rn <= K)
+/// [optional ProjectionExec]
+/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
+/// SortExec(partition_keys, order_keys)
+/// ```
+///
+/// # Replacement
+///
+/// ```text
+/// [optional ProjectionExec]
+/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
+/// PartitionedTopKExec(partition_keys, order_keys, fetch=K)
+/// ```
+///
+/// The `FilterExec` is removed entirely (all output rows have `rn ā {1..K}`).
+/// The `SortExec` is replaced by `PartitionedTopKExec` which maintains a
+/// per-partition top-K heap instead of sorting the entire dataset.
+///
+/// # Supported Predicates
+///
+/// - `rn <= K` ā fetch = K
+/// - `rn < K` ā fetch = K - 1
+/// - `K >= rn` (flipped) ā fetch = K
+/// - `K > rn` (flipped) ā fetch = K - 1
+///
+/// # When the Rule Does NOT Fire
Review Comment:
It would be great to describe when this rule does apply, rather than
focusing on when it does not. This optimization should only trigger for a
fairly small set of cases.
##########
datafusion/physical-optimizer/src/window_topn.rs:
##########
@@ -0,0 +1,330 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`WindowTopN`] optimizer rule for per-partition top-K window queries.
+//!
+//! Detects queries of the form:
+//!
+//! ```sql
+//! SELECT * FROM (
+//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn
+//! FROM t
+//! ) WHERE rn <= K;
+//! ```
+//!
+//! And replaces the `FilterExec ā BoundedWindowAggExec ā SortExec` pipeline
+//! with `BoundedWindowAggExec ā PartitionedTopKExec(fetch=K)`, removing both
+//! the `FilterExec` and `SortExec`.
+//!
+//! See [`PartitionedTopKExec`]
+//! for details on the replacement operator.
+
+use std::sync::Arc;
+
+use crate::PhysicalOptimizerRule;
+use arrow::datatypes::DataType;
+use datafusion_common::config::ConfigOptions;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_common::{Result, ScalarValue};
+use datafusion_expr::Operator;
+use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
+use datafusion_physical_expr::window::StandardWindowExpr;
+use datafusion_physical_plan::ExecutionPlan;
+use datafusion_physical_plan::filter::FilterExec;
+use datafusion_physical_plan::projection::ProjectionExec;
+use datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec;
+use datafusion_physical_plan::sorts::sort::SortExec;
+use datafusion_physical_plan::windows::{BoundedWindowAggExec, WindowUDFExpr};
+
+/// Physical optimizer rule that converts per-partition `ROW_NUMBER` top-K
+/// queries into a more efficient plan using [`PartitionedTopKExec`].
+///
+/// # Pattern Detected
+///
+/// ```text
+/// FilterExec(rn <= K)
+/// [optional ProjectionExec]
+/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
+/// SortExec(partition_keys, order_keys)
+/// ```
+///
+/// # Replacement
+///
+/// ```text
+/// [optional ProjectionExec]
+/// BoundedWindowAggExec(ROW_NUMBER PARTITION BY ... ORDER BY ...)
+/// PartitionedTopKExec(partition_keys, order_keys, fetch=K)
+/// ```
+///
+/// The `FilterExec` is removed entirely (all output rows have `rn ā {1..K}`).
+/// The `SortExec` is replaced by `PartitionedTopKExec` which maintains a
+/// per-partition top-K heap instead of sorting the entire dataset.
+///
+/// # Supported Predicates
+///
+/// - `rn <= K` ā fetch = K
+/// - `rn < K` ā fetch = K - 1
+/// - `K >= rn` (flipped) ā fetch = K
+/// - `K > rn` (flipped) ā fetch = K - 1
+///
+/// # When the Rule Does NOT Fire
+///
+/// - Window function is not `ROW_NUMBER` (e.g., `RANK`, `DENSE_RANK`)
+/// - No `PARTITION BY` clause (global top-K is already handled by
+/// `SortExec` with `fetch`)
+/// - Filter predicate is on a data column, not the window output column
+/// - `FilterExec` has an embedded projection
+/// - Child of `BoundedWindowAggExec` is not a `SortExec`
+/// - Config flag `enable_window_topn` is `false`
+///
+/// [`PartitionedTopKExec`]:
datafusion_physical_plan::sorts::partitioned_topk::PartitionedTopKExec
+#[derive(Default, Clone, Debug)]
+pub struct WindowTopN;
+
+impl WindowTopN {
+ pub fn new() -> Self {
+ Self
+ }
+
+ /// Attempt to transform a single plan node.
+ ///
+ /// Returns `Some(new_plan)` if the node matches the
+ /// `FilterExec ā [ProjectionExec] ā BoundedWindowAggExec ā SortExec`
+ /// pattern and can be rewritten, or `None` if the node should be
+ /// left unchanged.
+ fn try_transform(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn
ExecutionPlan>> {
+ // Step 1: Match FilterExec at the top
+ let filter = plan.downcast_ref::<FilterExec>()?;
+
+ // Don't handle filters with projections
Review Comment:
I'm curious why skipping this
##########
datafusion/physical-plan/src/sorts/partitioned_topk.rs:
##########
@@ -0,0 +1,498 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`PartitionedTopKExec`]: Top-K per partition operator
+//!
+//! For queries like:
+//! ```sql
+//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn
+//! FROM t WHERE rn <= N
+//! ```
+//!
+//! Instead of sorting the entire dataset, this operator maintains a
+//! [`TopK`] heap per partition (reusing the existing TopK implementation)
+//! and emits only the top-K rows per partition in sorted order
+//! `(partition_keys, order_keys)`.
+
+use std::fmt::{self, Formatter};
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, UInt32Array};
+use arrow::compute::take_record_batch;
+use arrow::datatypes::SchemaRef;
+use arrow::row::{OwnedRow, RowConverter};
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{HashMap, Result};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use parking_lot::RwLock;
+
+use crate::execution_plan::{Boundedness, EmissionType};
+use crate::metrics::ExecutionPlanMetricsSet;
+use crate::topk::{TopK, TopKDynamicFilters, build_sort_fields};
+use crate::{
+ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties,
+ PlanProperties, SendableRecordBatchStream,
stream::RecordBatchStreamAdapter,
+};
+
+/// Per-partition Top-K operator for window function queries.
+///
+/// # Background
+///
+/// "Top K per partition" is a common analytics pattern used for queries such
as
+/// "find the top 3 products by revenue for each store". The (simplified) SQL
+/// for such a query might be:
+///
+/// ```sql
+/// SELECT * FROM (
+/// SELECT *, ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC)
as rn
+/// FROM sales
+/// ) WHERE rn <= 3;
+/// ```
+///
+/// The unoptimized physical plan would be:
+///
+/// ```text
+/// FilterExec: rn <= 3
+/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY
[revenue DESC]
+/// SortExec: expr=[store ASC, revenue DESC]
+/// DataSourceExec
+/// ```
+///
+/// This plan sorts the **entire** dataset (O(N log N)), computes `ROW_NUMBER`
+/// for **all** rows, and then filters to keep only the top K per partition.
+/// With 10M rows, 1K partitions, and K=3, it sorts all 10M rows but only
+/// keeps 3K.
+///
+/// # Optimization
+///
+/// `PartitionedTopKExec` replaces the `SortExec` and the `FilterExec` is
+/// removed. The optimized plan becomes:
+///
+/// ```text
+/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY [revenue
DESC]
+/// PartitionedTopKExec: fetch=3, partition=[store], order=[revenue DESC]
+/// DataSourceExec
+/// ```
+///
+/// Instead of sorting the entire dataset, this operator reads unsorted input,
+/// maintains a [`TopK`] heap per distinct partition key, and emits only the
+/// top-K rows per partition in sorted order `(partition_keys, order_keys)`.
+///
+/// Cost: O(N log K) time instead of O(N log N), and O(K Ć P Ć row_size)
+/// memory where K = fetch, P = number of distinct partitions.
+///
+/// # Example
+///
+/// For the query above with `fetch=3` and input:
+///
+/// ```text
+/// store | revenue
+/// ------|--------
+/// A | 100
+/// B | 50
+/// A | 200
+/// B | 150
+/// A | 300
+/// A | 400
+/// ```
+///
+/// The operator maintains two heaps:
+/// - **store=A**: keeps top-3 by revenue DESC ā {400, 300, 200}, evicts 100
+/// - **store=B**: keeps top-3 by revenue DESC ā {150, 50} (only 2 rows)
+///
+/// Output (sorted by store ASC, revenue DESC):
+///
+/// ```text
+/// store | revenue
+/// ------|--------
+/// A | 400
+/// A | 300
+/// A | 200
+/// B | 150
+/// B | 50
+/// ```
+///
+/// This is then passed to `BoundedWindowAggExec` which assigns
+/// `ROW_NUMBER` 1, 2, 3 to each partition ā all of which satisfy `rn <= 3`.
+///
+/// # Limitations
+///
+/// - Only activated when the window function is `ROW_NUMBER` with a
+/// `PARTITION BY` clause. Global top-K (no `PARTITION BY`) is already
+/// handled efficiently by `SortExec` with `fetch`.
+/// - Memory usage is proportional to `K Ć P`. For very high cardinality
+/// partition keys (millions of distinct values), this may use significant
+/// memory.
+#[derive(Debug, Clone)]
+pub struct PartitionedTopKExec {
+ /// Input execution plan (reads unsorted data)
+ input: Arc<dyn ExecutionPlan>,
+ /// Full sort expressions: `[partition_keys..., order_keys...]`.
+ ///
+ /// For `PARTITION BY store ORDER BY revenue DESC` with sort
+ /// `[store ASC, revenue DESC]`, the first `partition_prefix_len`
+ /// expressions are the partition keys (`[store ASC]`) and the
+ /// remaining are the order-by keys (`[revenue DESC]`).
+ expr: LexOrdering,
+ /// Number of leading expressions in `expr` that define the partition
+ /// key. For example, `PARTITION BY a, b` ā `partition_prefix_len = 2`.
+ partition_prefix_len: usize,
+ /// Maximum number of rows to keep per partition (the K in "top-K").
+ /// Derived from the filter predicate: `rn <= 3` ā `fetch = 3`,
+ /// `rn < 3` ā `fetch = 2`.
+ fetch: usize,
+ /// Execution metrics
+ metrics_set: ExecutionPlanMetricsSet,
+ /// Cached plan properties (output ordering, partitioning, etc.)
+ cache: Arc<PlanProperties>,
+}
+
+impl PartitionedTopKExec {
+ /// Create a new `PartitionedTopKExec`.
+ ///
+ /// # Arguments
+ ///
+ /// * `input` - The child execution plan providing unsorted input rows.
+ /// * `expr` - Full sort ordering `[partition_keys..., order_keys...]`.
+ /// For `PARTITION BY pk ORDER BY val ASC`, this would be `[pk ASC, val
ASC]`.
+ /// * `partition_prefix_len` - Number of leading expressions in `expr`
+ /// that form the partition key. Must be >= 1.
+ /// * `fetch` - Maximum rows to retain per partition (the K in "top-K").
+ ///
+ /// # Example
+ ///
+ /// ```text
+ /// // For: ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC)
... WHERE rn <= 5
+ /// PartitionedTopKExec::try_new(
+ /// data_source,
+ /// LexOrdering([store ASC, revenue DESC]),
+ /// 1, // partition_prefix_len: 1 partition column (store)
+ /// 5, // fetch: keep top 5 per partition
+ /// )
+ /// ```
+ pub fn try_new(
+ input: Arc<dyn ExecutionPlan>,
+ expr: LexOrdering,
+ partition_prefix_len: usize,
+ fetch: usize,
+ ) -> Result<Self> {
+ let cache = Self::compute_properties(&input, expr.clone())?;
+ Ok(Self {
+ input,
+ expr,
+ partition_prefix_len,
+ fetch,
+ metrics_set: ExecutionPlanMetricsSet::new(),
+ cache: Arc::new(cache),
+ })
+ }
+
+ /// Returns the child execution plan.
+ pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
+ &self.input
+ }
+
+ /// Returns the full sort ordering `[partition_keys..., order_keys...]`.
+ pub fn expr(&self) -> &LexOrdering {
+ &self.expr
+ }
+
+ /// Returns the number of leading expressions in [`Self::expr`] that
+ /// define the partition key.
+ pub fn partition_prefix_len(&self) -> usize {
+ self.partition_prefix_len
+ }
+
+ /// Returns the maximum number of rows retained per partition.
+ pub fn fetch(&self) -> usize {
+ self.fetch
+ }
+
+ /// Compute [`PlanProperties`] for this operator.
+ ///
+ /// The output is sorted by `sort_exprs` (partition keys then order keys),
+ /// uses the same partitioning as the input, emits all output at once
+ /// (`EmissionType::Final`), and is bounded.
+ fn compute_properties(
+ input: &Arc<dyn ExecutionPlan>,
+ sort_exprs: LexOrdering,
+ ) -> Result<PlanProperties> {
+ let mut eq_properties = input.equivalence_properties().clone();
+ eq_properties.reorder(sort_exprs)?;
+
+ Ok(PlanProperties::new(
+ eq_properties,
+ input.output_partitioning().clone(),
+ EmissionType::Final,
+ Boundedness::Bounded,
+ ))
+ }
+}
+
+impl DisplayAs for PartitionedTopKExec {
+ fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter) -> fmt::Result {
+ match t {
+ DisplayFormatType::Default | DisplayFormatType::Verbose => {
+ let partition_exprs: Vec<String> =
self.expr[..self.partition_prefix_len]
+ .iter()
+ .map(|e| format!("{}", e.expr))
+ .collect();
+ let order_exprs: Vec<String> =
self.expr[self.partition_prefix_len..]
+ .iter()
+ .map(|e| format!("{e}"))
+ .collect();
+ write!(
+ f,
+ "PartitionedTopKExec: fetch={}, partition=[{}],
order=[{}]",
+ self.fetch,
+ partition_exprs.join(", "),
+ order_exprs.join(", "),
+ )
+ }
+ DisplayFormatType::TreeRender => {
+ writeln!(f, "fetch={}", self.fetch)?;
+ writeln!(f, "{}", self.expr)
+ }
+ }
+ }
+}
+
+impl ExecutionPlan for PartitionedTopKExec {
+ fn name(&self) -> &'static str {
+ "PartitionedTopKExec"
+ }
+
+ fn properties(&self) -> &Arc<PlanProperties> {
+ &self.cache
+ }
+
+ fn required_input_distribution(&self) -> Vec<Distribution> {
+ vec![Distribution::UnspecifiedDistribution]
+ }
+
+ fn maintains_input_order(&self) -> Vec<bool> {
+ vec![false]
+ }
+
+ fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+ vec![&self.input]
+ }
+
+ fn with_new_children(
+ self: Arc<Self>,
+ children: Vec<Arc<dyn ExecutionPlan>>,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ assert_eq!(children.len(), 1);
+ Ok(Arc::new(PartitionedTopKExec::try_new(
+ Arc::clone(&children[0]),
+ self.expr.clone(),
+ self.partition_prefix_len,
+ self.fetch,
+ )?))
+ }
+
+ fn apply_expressions(
Review Comment:
Not related to this PR, but Iām curious why this is a required
`ExecutionPlan` API and when it is used, given that different operators can
hold expressions for very different purposes š¤
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]