adriangb commented on code in PR #21956:
URL: https://github.com/apache/datafusion/pull/21956#discussion_r3246022325
##########
datafusion/core/tests/physical_optimizer/pushdown_sort.rs:
##########
@@ -310,7 +317,7 @@ fn test_no_prefix_match_longer_than_source() {
output:
Ok:
- SortExec: expr=[a@0 ASC, b@1 DESC NULLS LAST],
preserve_partitioning=[false]
- - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], output_ordering=[a@0 DESC NULLS LAST], file_type=parquet
+ - DataSourceExec: file_groups={1 group: [[x]]}, projection=[a, b,
c, d, e], file_type=parquet
Review Comment:
I wonder if we could include the inexact ordering, perhaps under an `EXPLAIN
ANALYZE` option?
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -482,6 +485,107 @@ impl ParquetSource {
pub(crate) fn reverse_row_groups(&self) -> bool {
self.reverse_row_groups
}
+
+ /// Extract the (column name, descending) tuple used by file-level
+ /// reordering. Driven entirely from the sort-pushdown channel
+ /// (`sort_order_for_reorder` + `reverse_row_groups`) — set by
+ /// `try_pushdown_sort`. We do not consult any dynamic-filter
+ /// metadata here: `DynamicFilterPhysicalExpr` is for runtime
+ /// threshold pruning, not for telling the source how to schedule
+ /// reads.
+ fn extract_topk_sort_info(&self) -> Option<(String, bool)> {
+ let sort_order = self.sort_order_for_reorder.as_ref()?;
+ let first = sort_order.first();
+ let col = first
+ .expr
+ .downcast_ref::<datafusion_physical_expr::expressions::Column>()?;
+ Some((col.name().to_string(), self.reverse_row_groups))
+ }
+
+ /// Extract the sort key from a file's statistics for reordering.
+ fn sort_key_for_file(
+ file: &datafusion_datasource::PartitionedFile,
+ col_idx: usize,
+ descending: bool,
+ ) -> Option<datafusion_common::ScalarValue> {
+ let stats = file.statistics.as_ref()?;
+ let col_stats = stats.column_statistics.get(col_idx)?;
+ if descending {
+ col_stats.min_value.get_value().cloned()
+ } else {
+ col_stats.max_value.get_value().cloned()
+ }
+ }
+}
+
+/// Threshold (fraction in `[0, 1]`) for the overlap guard in
+/// [`ParquetSource::reorder_files`]. When at least this fraction of
+/// adjacent file pairs (in sorted-by-min order) have overlapping
+/// `[min, max]` ranges, file reorder is skipped — file-level pruning
+/// cannot help and the reorder cost would dominate.
Review Comment:
Curious what that cost is?
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -482,6 +485,107 @@ impl ParquetSource {
pub(crate) fn reverse_row_groups(&self) -> bool {
self.reverse_row_groups
}
+
+ /// Extract the (column name, descending) tuple used by file-level
+ /// reordering. Driven entirely from the sort-pushdown channel
+ /// (`sort_order_for_reorder` + `reverse_row_groups`) — set by
+ /// `try_pushdown_sort`. We do not consult any dynamic-filter
+ /// metadata here: `DynamicFilterPhysicalExpr` is for runtime
+ /// threshold pruning, not for telling the source how to schedule
+ /// reads.
Review Comment:
This seems like a typical LLM comment: it's referencing the previous
implementation in this PR. That won't be helpful to readers once this code is
merged. Please update the comment to explain what the current / final code is
doing, not what previous drafts did.
##########
datafusion/datasource-parquet/src/access_plan.rs:
##########
@@ -16,10 +16,28 @@
// under the License.
use crate::sort::reverse_row_selection;
+use arrow::array::{Array, ArrayRef, BooleanArray};
+use arrow::datatypes::Schema;
use datafusion_common::{Result, assert_eq_or_internal_err};
+use datafusion_physical_expr::expressions::Column;
+use datafusion_physical_expr_common::sort_expr::LexOrdering;
+use log::debug;
+use parquet::arrow::arrow_reader::statistics::StatisticsConverter;
use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
+/// Fraction of adjacent (in sorted-by-min order) row group pairs whose
+/// `[min, max]` ranges overlap above which `reorder_by_statistics` will
+/// bail out without reordering.
+///
+/// When stats overlap heavily (e.g. unsorted columns like ClickBench's
+/// `EventTime` on `hits_partitioned`), reordering by min cannot enable
+/// row-group-level pruning — every "later" RG still has values that
+/// could appear in TopK. The reorder cost (CPU sort + lost IO sequential
+/// locality + parallel scheduling pessimization across workers all
+/// pulling "best" RGs first) then dominates, producing a net regression.
Review Comment:
I wonder how much each of these effects contributes? I'd imaging CPU to sort
row groups (few containers) should be pretty minimal. If it's row groups being
sorted I don't know how much data locality matters (row groups are a read
boundary). So I'd guess the parallel scheduling pessimization matters the most?
But if we only sort within a file and sort files within the partitions they are
already in, I'd think there is almost no impact on parallelization? Or with the
new morsel / file queue: if there ordering is inexact we can still keep the
cross-partition work stealing.
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -815,17 +975,53 @@ impl FileSource for ParquetSource {
new
};
- // Check if the reversed ordering satisfies the requested ordering
- if !reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
- return Ok(SortOrderPushdownResult::Unsupported);
+ // Check if the reversed ordering satisfies the requested ordering.
+ // If yes, this is the "Inexact via row-group reversal" case: source
+ // declares ASC ordering, request is DESC (or vice versa), so iterating
+ // RGs in reverse approximates the requested order.
+ if reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
+ let sort_order = LexOrdering::new(order.iter().cloned());
+ let mut new_source = self.clone().with_reverse_row_groups(true);
+ new_source.sort_order_for_reorder = sort_order;
+ return Ok(SortOrderPushdownResult::Inexact {
+ inner: Arc::new(new_source) as Arc<dyn FileSource>,
+ });
}
- // Return Inexact because we're only reversing row group order,
- // not guaranteeing perfect row-level ordering
- let new_source = self.clone().with_reverse_row_groups(true);
- Ok(SortOrderPushdownResult::Inexact {
- inner: Arc::new(new_source) as Arc<dyn FileSource>,
- })
+ // Inexact via stats-based row-group reorder. Even when neither natural
+ // nor reversed ordering matches (e.g. the table has no declared
ordering),
+ // if the sort column is a plain column present in the file schema, the
+ // source can still reorder row groups by their min/max statistics at
+ // runtime. This helps any `ORDER BY` on a sorted source, not just TopK
+ // — the optimization is no longer tied to the dynamic filter path.
Review Comment:
Idem above: comments should not reference outdated implementations.
##########
datafusion/datasource-parquet/src/source.rs:
##########
@@ -815,17 +975,53 @@ impl FileSource for ParquetSource {
new
};
- // Check if the reversed ordering satisfies the requested ordering
- if !reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
- return Ok(SortOrderPushdownResult::Unsupported);
+ // Check if the reversed ordering satisfies the requested ordering.
+ // If yes, this is the "Inexact via row-group reversal" case: source
+ // declares ASC ordering, request is DESC (or vice versa), so iterating
+ // RGs in reverse approximates the requested order.
+ if reversed_eq_properties.ordering_satisfy(order.iter().cloned())? {
+ let sort_order = LexOrdering::new(order.iter().cloned());
+ let mut new_source = self.clone().with_reverse_row_groups(true);
+ new_source.sort_order_for_reorder = sort_order;
+ return Ok(SortOrderPushdownResult::Inexact {
+ inner: Arc::new(new_source) as Arc<dyn FileSource>,
+ });
Review Comment:
Do we still need this as a special case? If it doesn't reverse the scans
within row groups how is it different than the new general "reorder based on
stats"?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]