Copilot commented on code in PR #21828:
URL: https://github.com/apache/datafusion/pull/21828#discussion_r3137788839


##########
datafusion/physical-optimizer/src/limit_pushdown.rs:
##########
@@ -268,11 +268,28 @@ pub fn pushdown_limit_helper(
                     .unwrap_or(plan_with_fetch);
 
                 if global_skip > 0 {
-                    add_global_limit(
-                        plan_with_preserve_order,
-                        global_skip,
-                        Some(global_fetch),
-                    )
+                    // Push offset to the plan. If the plan fully handles
+                    // offset (e.g. parquet without WHERE), eliminate
+                    // GlobalLimitExec. Otherwise keep it for remaining skip.
+                    if let Some(plan_with_offset) =
+                        plan_with_preserve_order.with_offset(global_skip)
+                    {
+                        if plan_with_offset.offset_fully_handled() {
+                            plan_with_offset
+                        } else {
+                            add_global_limit(
+                                plan_with_offset,
+                                global_skip,
+                                Some(global_fetch),
+                            )
+                        }

Review Comment:
   When `with_offset(global_skip)` returns `Some` but `offset_fully_handled()` 
is false, this code still keeps `GlobalLimitExec` with `skip=global_skip` while 
also pushing the same offset into the child plan. If the child actually skips 
any rows (e.g. parquet skipping fully-matched row groups), the offset will be 
applied twice and results will be incorrect. Consider only calling/using 
`with_offset` when the plan can fully handle the offset (and otherwise leave 
the child unchanged), or alternatively adjust the `GlobalLimitExec` skip to 
reflect only the remaining offset actually not handled by the child (requires a 
way to compute/report it).



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1101,6 +1105,23 @@ impl RowGroupsPrunedParquetOpen {
             None
         };
 
+        // Prune by offset: skip leading fully-matched row groups that fall
+        // entirely within the offset, so they are never read from disk.
+        let remaining_offset = if let Some(offset) = prepared.offset {
+            if offset > 0 {
+                Some(row_groups.prune_by_offset(
+                    offset,
+                    prepared.predicate.is_some(),
+                    rg_metadata,
+                    &prepared.file_metrics,
+                ))
+            } else {
+                None
+            }
+        } else {
+            None

Review Comment:
   Offset pruning is applied even when `prepared.predicate.is_some()` (via 
`prune_by_offset(..., prepared.predicate.is_some(), ...)`). Since the optimizer 
keeps `GlobalLimitExec` when predicates exist (offset not fully handled), 
skipping any fully-matched leading row groups here will change the stream 
output and can cause the global `skip` to be applied twice (wrong results when 
the offset spans ≥1 fully-matched row group). Consider only performing 
offset-based row group pruning / selection when offset is fully handled within 
parquet (currently the `predicate.is_none()` case), or otherwise ensure the 
remaining global skip is reduced accordingly.
   ```suggestion
           // Prune by offset only when parquet can fully handle it. When a
           // predicate exists, the optimizer keeps a global skip/offset above
           // parquet, so pruning fully-matched leading row groups here would
           // change the stream seen by that operator and can apply the offset
           // twice.
           let remaining_offset = match (prepared.offset, 
prepared.predicate.is_none()) {
               (Some(offset), true) if offset > 0 => 
Some(row_groups.prune_by_offset(
                   offset,
                   false,
                   rg_metadata,
                   &prepared.file_metrics,
               )),
               _ => None,
   ```



##########
datafusion/sqllogictest/test_files/sort_pushdown.slt:
##########
@@ -2280,3 +2280,146 @@ SET datafusion.execution.collect_statistics = true;
 
 statement ok
 SET datafusion.optimizer.enable_sort_pushdown = true;
+
+# ===========================================================
+# Test N: OFFSET pushdown to parquet
+# Verifies that OFFSET is pushed down to DataSourceExec,
+# eliminating GlobalLimitExec and skipping row groups.
+# ===========================================================
+
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+statement ok
+SET datafusion.execution.parquet.max_row_group_size = 5;
+
+statement ok
+CREATE TABLE tn_data(id INT, value INT) AS VALUES
+(1,10),(2,20),(3,30),(4,40),(5,50),
+(6,60),(7,70),(8,80),(9,90),(10,100),
+(11,110),(12,120),(13,130),(14,140),(15,150);
+
+query I
+COPY (SELECT * FROM tn_data ORDER BY id ASC)
+TO 'test_files/scratch/sort_pushdown/tn_offset/data.parquet';
+----
+15
+
+statement ok
+SET datafusion.execution.parquet.max_row_group_size = 1048576;
+
+statement ok
+CREATE EXTERNAL TABLE tn_offset(id INT, value INT)
+STORED AS PARQUET
+LOCATION 'test_files/scratch/sort_pushdown/tn_offset/data.parquet';
+
+# Test N.1: OFFSET pushdown — offset pushed to DataSourceExec, GlobalLimitExec 
eliminated
+query TT
+EXPLAIN SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+logical_plan
+01)Limit: skip=5, fetch=3
+02)--TableScan: tn_offset projection=[id, value], fetch=8
+physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/tn_offset/data.parquet]]},
 projection=[id, value], limit=8, offset=5, output_ordering=[id@0 ASC NULLS 
LAST], file_type=parquet
+
+# Test N.2: Results skip first 5 rows, return next 3
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+6 60
+7 70
+8 80
+
+# Test N.3: Large offset near end
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 12;
+----
+13 130
+14 140
+15 150
+
+# Test N.4: Offset at last few rows
+query II
+SELECT * FROM tn_offset LIMIT 10 OFFSET 13;
+----
+14 140
+15 150
+
+# Test N.5: Offset exactly at boundary (skip first RG entirely)
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+6 60
+7 70
+8 80
+
+# Test N.6: Offset zero (no skip)
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 0;
+----
+1 10
+2 20
+3 30
+
+# Test N.7: LIMIT 5 OFFSET 10 spanning RG boundaries
+# With max_row_group_size=5, we have 3 RGs: [1-5], [6-10], [11-15]
+# OFFSET 10 skips rows 1-10, LIMIT 5 returns rows 11-15
+query II
+SELECT * FROM tn_offset LIMIT 5 OFFSET 10;
+----
+11 110
+12 120
+13 130
+14 140
+15 150
+
+# Test N.8: OFFSET with multi-partition (target_partitions=4)
+statement ok
+SET datafusion.execution.target_partitions = 4;
+
+query II
+SELECT * FROM tn_offset LIMIT 3 OFFSET 5;
+----
+6 60
+7 70
+8 80
+
+# Restore single partition for remaining tests
+statement ok
+SET datafusion.execution.target_partitions = 1;
+
+# Test N.9: OFFSET with WHERE clause — offset still works
+# (GlobalLimitExec may remain when there is a filter)
+query II
+SELECT * FROM tn_offset WHERE value > 50 LIMIT 3 OFFSET 2;
+----
+8 80
+9 90
+10 100
+

Review Comment:
   The WHERE-clause coverage here uses `OFFSET 2`, which does not exercise the 
case where the offset spans at least one fully-matched row group under a 
predicate (the scenario that can break if offset is both pushed into parquet 
and also applied by `GlobalLimitExec`). Consider adding a variant like `WHERE 
value > 50 LIMIT 3 OFFSET 7` (with 5-row row groups) to ensure correctness when 
the offset crosses a fully-matched row group boundary under filtering.
   ```suggestion
   
   # Test N.9b: OFFSET with WHERE clause crossing a fully-matched row-group 
boundary
   # Ensures correctness when OFFSET may be pushed into parquet and also 
applied by GlobalLimitExec
   query II
   SELECT * FROM tn_offset WHERE value > 50 LIMIT 3 OFFSET 7;
   ----
   13 130
   14 140
   15 150
   ```



##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -849,6 +871,28 @@ impl DataSource for FileScanConfig {
         self.limit
     }
 
+    fn with_offset(&self, offset: usize) -> Option<Arc<dyn DataSource>> {
+        if !self.file_source.supports_offset() {
+            return None;
+        }
+        let source = FileScanConfigBuilder::from(self.clone())
+            .with_offset(Some(offset))
+            .build();
+        Some(Arc::new(source))
+    }
+
+    fn offset(&self) -> Option<usize> {
+        self.offset
+    }
+
+    fn offset_fully_handled(&self) -> bool {
+        // Offset is fully handled when set AND no filter —
+        // raw row counts are accurate for offset calculation.
+        // With filters, only fully-matched RGs can be skipped,
+        // GlobalLimitExec handles the rest.
+        self.offset.is_some() && self.file_source.filter().is_none()

Review Comment:
   `offset_fully_handled()` currently returns true whenever `offset` is set and 
there is no filter, but it does not account for multi-partition scans. 
`FileStream` applies `limit` per output partition, so removing 
`GlobalLimitExec` when `file_groups.len() > 1` can yield more than `fetch` rows 
(and make OFFSET/LIMIT semantics depend on partitioning). Consider requiring a 
single output partition (e.g. `self.file_groups.len() == 1` / 
`output_partitioning().partition_count() == 1`) before reporting the offset as 
fully handled.
   ```suggestion
           // Offset is fully handled only when set, no filter is present,
           // and the scan has a single output partition.
           //
           // With filters, only fully-matched RGs can be skipped and
           // GlobalLimitExec handles the rest. Likewise, multi-partition
           // scans must retain global limit enforcement because FileStream
           // applies limits per output partition.
           self.offset.is_some()
               && self.file_source.filter().is_none()
               && self.file_groups.len() == 1
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to