zhuqi-lucas commented on code in PR #21580:
URL: https://github.com/apache/datafusion/pull/21580#discussion_r3123894244


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1123,13 +1166,157 @@ impl RowGroupsPrunedParquetOpen {
             );
         }
 
-        // Prepare the access plan (extract row groups and row selection)
+        // Row group ordering optimization (two composable steps):
+        //
+        // 1. reorder_by_statistics: sort RGs by min values (ASC) to align
+        //    with the file's declared output ordering. This fixes out-of-order
+        //    RGs (e.g., from append-heavy workloads) without changing 
direction.
+        //    Skipped gracefully when statistics are unavailable.
+        //
+        // 2. reverse: flip the order for DESC queries. Applied AFTER reorder
+        //    so the reversed order is correct whether or not reorder changed
+        //    anything. Also handles row_selection remapping.
+        //
+        // For sorted data: reorder is a no-op, reverse gives perfect DESC.
+        // For unsorted data: reorder fixes the order, reverse flips for DESC.
+        // Build reorder optimizer from sort_order_for_reorder (Inexact path)
+        // or from DynamicFilterPhysicalExpr sort_options (any TopK query).
+        // Fuzz test uses tiebreaker columns so reorder is safe for all TopK.
+        let reorder_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if let Some(sort_order) = &prepared.sort_order_for_reorder {
+            Some(
+                
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                    sort_order.clone(),
+                ))
+                    as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>,
+            )
+        } else if let Some(predicate) = &prepared.predicate
+            && let Some(df) = find_dynamic_filter(predicate)
+            && let Some(sort_options) = df.sort_options()
+            && sort_options.len() == 1
+        {
+            // Build a sort order from DynamicFilter for non-sort-pushdown 
TopK.
+            // Quick bail: check if the sort column exists in file schema.
+            // For GROUP BY + ORDER BY, the sort column is an aggregate output
+            // (not in parquet) — skip to avoid wasted StatisticsConverter 
work.
+            let children = df.children();
+            if !children.is_empty() {
+                let col = find_column_in_expr(children[0]);
+                if let Some(ref c) = col
+                    && prepared
+                        .physical_file_schema
+                        .field_with_name(c.name())
+                        .is_ok()
+                {
+                    let sort_expr =
+                        
datafusion_physical_expr_common::sort_expr::PhysicalSortExpr {
+                            expr: Arc::clone(children[0]),
+                            options: arrow::compute::SortOptions {
+                                descending: false,
+                                nulls_first: sort_options[0].nulls_first,
+                            },
+                        };
+                    LexOrdering::new(vec![sort_expr]).map(|order| {
+                        
Box::new(crate::access_plan_optimizer::ReorderByStatistics::new(
+                            order,
+                        ))
+                            as Box<dyn 
crate::access_plan_optimizer::AccessPlanOptimizer>
+                    })
+                } else {
+                    None
+                }
+            } else {
+                None
+            }
+        } else {
+            None
+        };
+
+        // Reverse for DESC queries. Only when reorder is active (the sort
+        // column exists in parquet stats). Without reorder, reversing RGs
+        // randomly changes I/O patterns with no benefit.
+        let is_descending = prepared.reverse_row_groups
+            || (reorder_optimizer.is_some()
+                && prepared
+                    .predicate
+                    .as_ref()
+                    .and_then(find_dynamic_filter)
+                    .and_then(|df| df.sort_options().map(|opts| 
opts[0].descending))
+                    .unwrap_or(false));
+        let reverse_optimizer: Option<
+            Box<dyn crate::access_plan_optimizer::AccessPlanOptimizer>,
+        > = if is_descending {
+            Some(Box::new(crate::access_plan_optimizer::ReverseRowGroups))
+        } else {
+            None
+        };
+
+        // Prepare the access plan and apply optimizers in order:
+        // 1. reorder (fix out-of-order RGs to match declared ordering)
+        // 2. reverse (flip for DESC queries)
         let mut prepared_plan = access_plan.prepare(rg_metadata)?;
+        if let Some(opt) = &reorder_optimizer {
+            prepared_plan = opt.optimize(
+                prepared_plan,
+                file_metadata.as_ref(),
+                &prepared.physical_file_schema,
+            )?;
+        }
+        if let Some(opt) = &reverse_optimizer {
+            prepared_plan = opt.optimize(
+                prepared_plan,
+                file_metadata.as_ref(),
+                &prepared.physical_file_schema,
+            )?;
+        }
 
-        // Potentially reverse the access plan for performance.
-        // See `ParquetSource::try_pushdown_sort` for the rationale.
-        if prepared.reverse_row_groups {
-            prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?;
+        // TopK cumulative pruning: after reorder + reverse, the RGs are in
+        // optimal order. Accumulate rows from the front until >= K, prune 
rest.
+        //
+        // Only safe when predicate is DynamicFilter-only (no WHERE clause).
+        // With WHERE, raw num_rows overestimates qualifying rows — cumulative
+        // prune may keep too few RGs, returning fewer than K results.
+        //
+        // Additionally requires either sort pushdown (guaranteed 
non-overlapping)
+        // or verified non-overlap from statistics.
+        let is_pure_dynamic_filter = 
prepared.predicate.as_ref().is_some_and(|p| {
+            let any_ref: &dyn std::any::Any = p.as_ref();
+            any_ref
+                .downcast_ref::<DynamicFilterPhysicalExpr>()
+                .is_some()
+        });
+        let has_sort_pushdown = prepared.sort_order_for_reorder.is_some();
+        if is_pure_dynamic_filter
+            && let Some(predicate) = &prepared.predicate
+            && let Some(df) = find_dynamic_filter(predicate)
+            && let Some(fetch) = df.fetch()
+            && (has_sort_pushdown
+                || rgs_are_non_overlapping(
+                    &prepared_plan,
+                    file_metadata.as_ref(),
+                    &prepared.physical_file_schema,
+                    &df,
+                ))
+        {
+            let rg_indexes = prepared_plan.row_group_indexes();
+            let mut cumulative = 0usize;
+            let mut keep_count = 0;
+            for &idx in rg_indexes {
+                cumulative += file_metadata.row_group(idx).num_rows() as usize;

Review Comment:
   Good point. Currently create_filter and fetch are set in the same method 
(with_fetch), and we fixed the ordering so fetch is set before create_filter is 
called. There's no separate code path that updates fetch
    without recreating the filter.
   
   But you're right that this coupling is fragile,  if a future optimizer calls 
with_fetch independently, the filter's fetch would go stale. I'll optimize it 
as follow-up to consider making fetch on DynamicFilterPhysicalExpr read 
directly from SortExec.fetch (via shared reference) instead of copying the 
value at creation time.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to