2010YOUY01 commented on code in PR #21479:
URL: https://github.com/apache/datafusion/pull/21479#discussion_r3068900016


##########
datafusion/physical-plan/src/sorts/partitioned_topk.rs:
##########
@@ -0,0 +1,512 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! [`PartitionedTopKExec`]: Top-K per partition operator
+//!
+//! For queries like:
+//! ```sql
+//! SELECT *, ROW_NUMBER() OVER (PARTITION BY pk ORDER BY val) as rn
+//! FROM t WHERE rn <= N
+//! ```
+//!
+//! Instead of sorting the entire dataset, this operator maintains a
+//! [`TopK`] heap per partition (reusing the existing TopK implementation)
+//! and emits only the top-K rows per partition in sorted order
+//! `(partition_keys, order_keys)`.
+
+use std::fmt::{self, Formatter};
+use std::sync::Arc;
+
+use arrow::array::{RecordBatch, UInt32Array};
+use arrow::compute::take_record_batch;
+use arrow::datatypes::SchemaRef;
+use arrow::row::{OwnedRow, RowConverter};
+use datafusion_common::tree_node::TreeNodeRecursion;
+use datafusion_common::{HashMap, Result};
+use datafusion_execution::TaskContext;
+use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_expr::expressions::{DynamicFilterPhysicalExpr, lit};
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use futures::StreamExt;
+use futures::TryStreamExt;
+use parking_lot::RwLock;
+
+use crate::execution_plan::{Boundedness, EmissionType};
+use crate::metrics::ExecutionPlanMetricsSet;
+use crate::topk::{TopK, TopKDynamicFilters, build_sort_fields};
+use crate::{
+    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, 
ExecutionPlanProperties,
+    PlanProperties, SendableRecordBatchStream, 
stream::RecordBatchStreamAdapter,
+};
+
+/// Per-partition Top-K operator for window function queries.
+///
+/// # Background
+///
+/// "Top K per partition" is a common analytics pattern used for queries such 
as
+/// "find the top 3 products by revenue for each store". The (simplified) SQL
+/// for such a query might be:
+///
+/// ```sql
+/// SELECT * FROM (
+///     SELECT *, ROW_NUMBER() OVER (PARTITION BY store ORDER BY revenue DESC) 
as rn
+///     FROM sales
+/// ) WHERE rn <= 3;
+/// ```
+///
+/// The unoptimized physical plan would be:
+///
+/// ```text
+/// FilterExec: rn <= 3
+///   BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY 
[revenue DESC]
+///     SortExec: expr=[store ASC, revenue DESC]
+///       DataSourceExec
+/// ```
+///
+/// This plan sorts the **entire** dataset (O(N log N)), computes `ROW_NUMBER`
+/// for **all** rows, and then filters to keep only the top K per partition.
+/// With 10M rows, 1K partitions, and K=3, it sorts all 10M rows but only
+/// keeps 3K.
+///
+/// # Optimization
+///
+/// `PartitionedTopKExec` replaces the `SortExec` and the `FilterExec` is
+/// removed. The optimized plan becomes:
+///
+/// ```text
+/// BoundedWindowAggExec: ROW_NUMBER() PARTITION BY [store] ORDER BY [revenue 
DESC]
+///   PartitionedTopKExec: fetch=3, partition=[store], order=[revenue DESC]
+///     DataSourceExec
+/// ```
+///
+/// Instead of sorting the entire dataset, this operator reads unsorted input,
+/// maintains a [`TopK`] heap per distinct partition key, and emits only the
+/// top-K rows per partition in sorted order `(partition_keys, order_keys)`.
+///
+/// Cost: O(N log K) time instead of O(N log N), and O(K × P × row_size)
+/// memory where K = fetch, P = number of distinct partitions.
+///

Review Comment:
   ```suggestion
   /// ## Why maintaining partition key order in output
   /// Window functions do not require partition keys to be globally sorted, and
   /// enforcing such ordering in the output can introduce unnecessary overhead.
   /// However, the physical optimizer framework currently cannot express an
   /// ordering that is only grouped by some keys while ordered by others. For
   /// example:
   ///
   /// ```text
   /// PARTITION BY a ORDER BY b
   ///
   /// a  b
   /// ----
   /// 2  1
   /// 2  2
   /// 1  1
   /// 1  2
   ///
   /// (Rows with the same `a` should be consecutive, but `a` does not need to 
be
   /// globally ordered.)
   /// ```
   ///
   /// For simplicity, the output is sorted here.
   ///
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to