alamb commented on code in PR #21828:
URL: https://github.com/apache/datafusion/pull/21828#discussion_r3142492406


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1126,6 +1147,34 @@ impl RowGroupsPrunedParquetOpen {
         // Prepare the access plan (extract row groups and row selection)
         let mut prepared_plan = access_plan.prepare(rg_metadata)?;
 
+        // Handle remaining offset via RowSelection — only when there's NO
+        // predicate (all rows match, raw row count is accurate for offset).
+        // With predicates, GlobalLimitExec handles remaining offset since
+        // we can't know how many rows survive the filter.
+        if let Some(remaining_offset) = remaining_offset

Review Comment:
   would it be possible to move this logic into a method on prepared_plan or 
access_plan? '
   
   It seems 
   1. somewhat out of place in the middle of the state machine for the opener 🤔 
   2. That it only updates a field on `prepared_plan`



##########
datafusion/datasource-parquet/src/row_group_filter.rs:
##########
@@ -212,6 +212,51 @@ impl RowGroupAccessPlanFilter {
         }
     }
 
+    /// Prune row groups that can be entirely skipped due to offset.

Review Comment:
   this is cool



##########
datafusion/datasource/src/source.rs:
##########
@@ -174,6 +174,23 @@ pub trait DataSource: Any + Send + Sync + Debug {
     /// Return a copy of this DataSource with a new fetch limit
     fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
     fn fetch(&self) -> Option<usize>;
+
+    /// Return a copy of this DataSource with a new offset (number of rows to 
skip).
+    /// Returns `None` if the data source does not support offset pushdown.
+    fn with_offset(&self, _offset: usize) -> Option<Arc<dyn DataSource>> {
+        None
+    }
+
+    /// Gets the offset for the operator, `None` means there is no offset.
+    fn offset(&self) -> Option<usize> {
+        None
+    }
+
+    /// Whether offset is fully handled (no need for GlobalLimitExec skip).

Review Comment:
   I am confused  by this API -- if the datasource returns `Some(..)` for 
offset  then I expect that the DataSource will correctly implement the offset 
handling. It seems error prone if callers **ALSO** have to remember to check 
`offset_fully_handled`
   
   Perhaps we could hange FileScanConfig so that if a filter is ever set it 
clears out the offset (or better yet has an enum so it is not possible to set 
the offset and a filter at the same time)



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -1101,6 +1105,23 @@ impl RowGroupsPrunedParquetOpen {
             None
         };
 
+        // Prune by offset: skip leading fully-matched row groups that fall
+        // entirely within the offset, so they are never read from disk.
+        let remaining_offset = if let Some(offset) = prepared.offset {
+            if offset > 0 {
+                Some(row_groups.prune_by_offset(
+                    offset,
+                    prepared.predicate.is_some(),
+                    rg_metadata,
+                    &prepared.file_metrics,
+                ))
+            } else {
+                None
+            }
+        } else {
+            None

Review Comment:
   This sounds like a real isue to look into (and cover with a test)



##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -169,6 +169,10 @@ pub struct FileScanConfig {
     /// The maximum number of records to read from this plan. If `None`,
     /// all records after filtering are returned.
     pub limit: Option<usize>,
+    /// The number of rows to skip before returning results.
+    /// When combined with `limit`, this enables efficient OFFSET handling
+    /// at the file scan level by skipping entire row groups when possible.
+    pub offset: Option<usize>,

Review Comment:
   I think technically this is a public API and thus an API change (so we 
should mark the PR thusly)



##########
datafusion/datasource/src/file_scan_config/mod.rs:
##########
@@ -849,6 +871,28 @@ impl DataSource for FileScanConfig {
         self.limit
     }
 
+    fn with_offset(&self, offset: usize) -> Option<Arc<dyn DataSource>> {
+        if !self.file_source.supports_offset() {
+            return None;
+        }
+        let source = FileScanConfigBuilder::from(self.clone())
+            .with_offset(Some(offset))
+            .build();
+        Some(Arc::new(source))
+    }
+
+    fn offset(&self) -> Option<usize> {
+        self.offset
+    }
+
+    fn offset_fully_handled(&self) -> bool {

Review Comment:
   It would help me if you could add a comment explaining what this method is 
for and maybe a more descriptive name. 
   
    I found "handled" a bit generic -- maybe the name "can_apply_offset" or 
something would be more



##########
datafusion/datasource/src/file.rs:
##########
@@ -280,6 +280,13 @@ pub trait FileSource: Any + Send + Sync {
         Ok(SortOrderPushdownResult::Unsupported)
     }
 
+    /// Whether this source fully handles offset at the scan level.
+    /// When true, the optimizer can eliminate GlobalLimitExec's skip.

Review Comment:
   In general I think it would be easier to understand if this documentation 
does't refer to `GlobalLimitExec`.... and instead simply says "can be 
efficiently implemented by the file source" or something like that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to