Copilot commented on code in PR #21828:
URL: https://github.com/apache/datafusion/pull/21828#discussion_r3137788839
##########
datafusion/physical-optimizer/src/limit_pushdown.rs:
##########
@@ -268,11 +268,28 @@ pub fn pushdown_limit_helper(
.unwrap_or(plan_with_fetch);
if global_skip > 0 {
- add_global_limit(
- plan_with_preserve_order,
- global_skip,
- Some(global_fetch),
- )
+ // Push offset to the plan. If the plan fully handles
+ // offset (e.g. parquet without WHERE), eliminate
+ // GlobalLimitExec. Otherwise keep it for remaining skip.
+ if let Some(plan_with_offset) =
+ plan_with_preserve_order.with_offset(global_skip)
+ {
+ if plan_with_offset.offset_fully_handled() {
+ plan_with_offset
+ } else {
+ add_global_limit(
+ plan_with_offset,
+ global_skip,
+ Some(global_fetch),
+ )
+ }
Review Comment:
When `with_offset(global_skip)` returns `Some` but `offset_fully_handled()`
is false, this code still keeps `GlobalLimitExec` with `skip=global_skip` while
also pushing the same offset into the child plan. If the child actually skips
any rows (e.g. parquet skipping fully-matched row groups), the offset will be
applied twice and results will be incorrect. Consider only calling/using
`with_offset` when the plan can fully handle the offset (and otherwise leave
the child unchanged), or alternatively adjust the `GlobalLimitExec` skip to
reflect only the remaining offset actually not handled by the child (requires a
way to compute/report it).
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1101,6 +1105,23 @@ impl RowGroupsPrunedParquetOpen {
None
};
+ // Prune by offset: skip leading fully-matched row groups that fall
+ // entirely within the offset, so they are never read from disk.
+ let remaining_offset = if let Some(offset) = prepared.offset {
+ if offset > 0 {
+ Some(row_groups.prune_by_offset(
+ offset,
+ prepared.predicate.is_some(),
+ rg_metadata,
+ &prepared.file_metrics,
+ ))
+ } else {
+ None
+ }
+ } else {
+ None
Review Comment:
Offset pruning is applied even when `prepared.predicate.is_some()` (via
`prune_by_offset(..., prepared.predicate.is_some(), ...)`). Since the optimizer
keeps `GlobalLimitExec` when predicates exist (offset not fully handled),
skipping any fully-matched leading row groups here will change the stream
output and can cause the global `skip` to be applied twice (wrong results when
the offset spans ≥1 fully-matched row group). Consider only performing
offset-based row group pruning / selection when offset is fully handled within
parquet (currently the `predicate.is_none()` case), or otherwise ensure the
remaining global skip is reduced accordingly.
```suggestion
// Prune by offset only when parquet can fully handle it. When a
// predicate exists, the optimizer keeps a global skip/offset above
// parquet, so pruning fully-matched leading row groups here would
// change the stream seen by that operator and can apply the offset
// twice.
let remaining_offset = match (prepared.offset,
prepared.predicate.is_none()) {
(Some(offset), true) if offset > 0 =>
Some(row_groups.prune_by_offset(
offset,
false,
rg_metadata,
&prepared.file_metrics,
)),
_ => None,
```
##########
datafusion/sqllogictest/test_files/sort_pushdown.slt:
##########
@@ -2280,3 +2280,146 @@ SET datafusion.execution.collect_statistics = true;
statement ok
SET datafusion.optimizer.enable_sort_pushdown = true;
+
+# ===========================================================
+# Test N: OFFSET pushdown to parquet
+# Verifies that OFFSET is pushed down to DataSourceExec,
+# eliminating GlobalLimitExec and skipping row groups.
+# ===========================================================
+
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+statement ok
+SET datafusion.execution.parquet.max_row_group_size = 5;
+
+statement ok
+CREATE TABLE tn_data(id INT, value INT) AS VALUES
+(1,10),(2,20),(3,30),(4,40),(5,50),
+(6,60),(7,70),(8,80),(9,90),(10,100),
+(11,110),(12,120),(13,130),(14,140),(15,150);
+
+query I
+COPY (SELECT * FROM tn_data ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tn_offset/data.parquet';
+----
+15
+
+statement ok
+SET datafusion.execution.parquet.max_row_group_size = 1048576;
+
+statement ok
+CREATE EXTERNAL TABLE tn_offset(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/tn_offset/data.parquet';
+
+# Test N.1: OFFSET pushdown — offset pushed to DataSourceExec, GlobalLimitExec
eliminated
+query TT
+EXPLAIN SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+logical_plan
+01)Limit: skip=5, fetch=3
+02)--TableScan: tn_offset projection=[id, value], fetch=8
+physical_plan DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tn_offset/data.parquet]]},
projection=[id, value], limit=8, offset=5, output_ordering=[id@0 ASC NULLS
LAST], file_type=parquet
+
+# Test N.2: Results skip first 5 rows, return next 3
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+6 60
+7 70
+8 80
+
+# Test N.3: Large offset near end
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 12;
+----
+13 130
+14 140
+15 150
+
+# Test N.4: Offset at last few rows
+query II
+SELECT * FROM tn_offset LIMIT 10 OFFSET 13;
+----
+14 140
+15 150
+
+# Test N.5: Offset exactly at boundary (skip first RG entirely)
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+6 60
+7 70
+8 80
+
+# Test N.6: Offset zero (no skip)
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 0;
+----
+1 10
+2 20
+3 30
+
+# Test N.7: LIMIT 5 OFFSET 10 spanning RG boundaries
+# With max_row_group_size=5, we have 3 RGs: [1-5], [6-10], [11-15]
+# OFFSET 10 skips rows 1-10, LIMIT 5 returns rows 11-15
+query II
+SELECT * FROM tn_offset LIMIT 5 OFFSET 10;
+----
+11 110
+12 120
+13 130
+14 140
+15 150
+
+# Test N.8: OFFSET with multi-partition (target_partitions=4)
+statement ok
+SET datafusion.execution.target_partitions = 4;
+
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+6 60
+7 70
+8 80
+
+# Restore single partition for remaining tests
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+# Test N.9: OFFSET with WHERE clause — offset still works
+# (GlobalLimitExec may remain when there is a filter)
+query II
+SELECT * FROM tn_offset WHERE value > 50 LIMIT 3 OFFSET 2;
+----
+8 80
+9 90
+10 100
+
Review Comment:
The WHERE-clause coverage here uses `OFFSET 2`, which does not exercise the
case where the offset spans at least one fully-matched row group under a
predicate (the scenario that can break if offset is both pushed into parquet
and also applied by `GlobalLimitExec`). Consider adding a variant like `WHERE
value > 50 LIMIT 3 OFFSET 7` (with 5-row row groups) to ensure correctness when
the offset crosses a fully-matched row group boundary under filtering.
```suggestion
# Test N.9b: OFFSET with WHERE clause crossing a fully-matched row-group
boundary
# Ensures correctness when OFFSET may be pushed into parquet and also
applied by GlobalLimitExec
query II
SELECT * FROM tn_offset WHERE value > 50 LIMIT 3 OFFSET 7;
----
13 130
14 140
15 150
```
##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -849,6 +871,28 @@ impl DataSource for FileScanConfig {
self.limit
}
+ fn with_offset(&self, offset: usize) -> Option<Arc<dyn DataSource>> {
+ if !self.file_source.supports_offset() {
+ return None;
+ }
+ let source = FileScanConfigBuilder::from(self.clone())
+ .with_offset(Some(offset))
+ .build();
+ Some(Arc::new(source))
+ }
+
+ fn offset(&self) -> Option<usize> {
+ self.offset
+ }
+
+ fn offset_fully_handled(&self) -> bool {
+ // Offset is fully handled when set AND no filter —
+ // raw row counts are accurate for offset calculation.
+ // With filters, only fully-matched RGs can be skipped,
+ // GlobalLimitExec handles the rest.
+ self.offset.is_some() && self.file_source.filter().is_none()
Review Comment:
`offset_fully_handled()` currently returns true whenever `offset` is set and
there is no filter, but it does not account for multi-partition scans.
`FileStream` applies `limit` per output partition, so removing
`GlobalLimitExec` when `file_groups.len() > 1` can yield more than `fetch` rows
(and make OFFSET/LIMIT semantics depend on partitioning). Consider requiring a
single output partition (e.g. `self.file_groups.len() == 1` /
`output_partitioning().partition_count() == 1`) before reporting the offset as
fully handled.
```suggestion
// Offset is fully handled only when set, no filter is present,
// and the scan has a single output partition.
//
// With filters, only fully-matched RGs can be skipped and
// GlobalLimitExec handles the rest. Likewise, multi-partition
// scans must retain global limit enforcement because FileStream
// applies limits per output partition.
self.offset.is_some()
&& self.file_source.filter().is_none()
&& self.file_groups.len() == 1
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]