haohuaijin commented on code in PR #21527:
URL: https://github.com/apache/datafusion/pull/21527#discussion_r3078556470


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -1424,6 +1426,50 @@ impl ExecutionPlan for SortExec {
 
         Ok(FilterDescription::new().with_child(child))
     }
+
+    fn handle_child_pushdown_result(
+        &self,
+        phase: FilterPushdownPhase,
+        child_pushdown_result: ChildPushdownResult,
+        _config: &datafusion_common::config::ConfigOptions,
+    ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+        // Only absorb filters in Pre phase for a plain sort (no fetch).
+        // A sort with fetch (TopK) must not accept filters: reordering
+        // filter vs. limit would change semantics.
+        if phase != FilterPushdownPhase::Pre || self.fetch.is_some() {

Review Comment:
   Good question, my initial though was that the swap of sort and filter is 
unrelated to dynamic filters, so I only do swap for pre pahse. but today when i 
reconsider this part, i find one incorrect example. i added this example in 
[10f1462](https://github.com/apache/datafusion/pull/21527/commits/10f14626f56c49da40d982d5382436f436c4cb9b)
   
   if i remove the `phase != FilterPushdownPhase::Pre`, the result will like 
below, we insert the dynamic filter under the sort, before the datasource if 
datasource do not accept dynamic filter, 
   ```rust
       insta::assert_snapshot!(
           OptimizationTest::new(
               Arc::clone(&plan),
               FilterPushdown::new_post_optimization(),
               true
           ),
           @r"
       OptimizationTest:
         input:
           - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), 
(b@1, b@1)]
           -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true
           -   SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
           -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=false
         output:
           Ok:
             - HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(a@0, a@0), 
(b@1, b@1)]
             -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=true
             -   SortExec: expr=[a@0 ASC], preserve_partitioning=[false]
             -     FilterExec: DynamicFilter [ empty ]
             -       DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b], file_type=test, pushdown_supported=false
       "
       );
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to