This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-53 by this push:
new 19e9c06da9 [branch-53] fix: preserve None projection semantics across
FFI boundary in ForeignTableProvider::scan (#20393) (#20895)
19e9c06da9 is described below
commit 19e9c06da985a1f43acf7bfdeea761e694e70030
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 14:07:04 2026 -0400
[branch-53] fix: preserve None projection semantics across FFI boundary in
ForeignTableProvider::scan (#20393) (#20895)
- Part of https://github.com/apache/datafusion/issues/19692
This PR:
- Backports https://github.com/apache/datafusion/pull/20393 from
@Kontinuation to the branch-53 line
Co-authored-by: Kristin Cowalcijk <[email protected]>
---
datafusion/ffi/src/table_provider.rs | 77 ++++++++++++++++++++++++++++++++----
1 file changed, 69 insertions(+), 8 deletions(-)
diff --git a/datafusion/ffi/src/table_provider.rs
b/datafusion/ffi/src/table_provider.rs
index df8b648026..1559549e63 100644
--- a/datafusion/ffi/src/table_provider.rs
+++ b/datafusion/ffi/src/table_provider.rs
@@ -108,7 +108,7 @@ pub struct FFI_TableProvider {
scan: unsafe extern "C" fn(
provider: &Self,
session: FFI_SessionRef,
- projections: RVec<usize>,
+ projections: ROption<RVec<usize>>,
filters_serialized: RVec<u8>,
limit: ROption<usize>,
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>>,
@@ -232,7 +232,7 @@ unsafe extern "C" fn supports_filters_pushdown_fn_wrapper(
unsafe extern "C" fn scan_fn_wrapper(
provider: &FFI_TableProvider,
session: FFI_SessionRef,
- projections: RVec<usize>,
+ projections: ROption<RVec<usize>>,
filters_serialized: RVec<u8>,
limit: ROption<usize>,
) -> FfiFuture<FFIResult<FFI_ExecutionPlan>> {
@@ -269,11 +269,12 @@ unsafe extern "C" fn scan_fn_wrapper(
}
};
- let projections: Vec<_> = projections.into_iter().collect();
+ let projections: Option<Vec<usize>> =
+ projections.into_option().map(|p| p.into_iter().collect());
let plan = rresult_return!(
internal_provider
- .scan(session, Some(&projections), &filters, limit.into())
+ .scan(session, projections.as_ref(), &filters, limit.into())
.await
);
@@ -461,8 +462,9 @@ impl TableProvider for ForeignTableProvider {
) -> Result<Arc<dyn ExecutionPlan>> {
let session = FFI_SessionRef::new(session, None,
self.0.logical_codec.clone());
- let projections: Option<RVec<usize>> =
- projection.map(|p| p.iter().map(|v| v.to_owned()).collect());
+ let projections: ROption<RVec<usize>> = projection
+ .map(|p| p.iter().map(|v| v.to_owned()).collect())
+ .into();
let codec: Arc<dyn LogicalExtensionCodec> =
(&self.0.logical_codec).into();
let filter_list = LogicalExprList {
@@ -474,7 +476,7 @@ impl TableProvider for ForeignTableProvider {
let maybe_plan = (self.0.scan)(
&self.0,
session,
- projections.unwrap_or_default(),
+ projections,
filters_serialized,
limit.into(),
)
@@ -658,8 +660,9 @@ mod tests {
let provider = Arc::new(MemTable::try_new(schema,
vec![vec![batch1]])?);
- let ffi_provider =
+ let mut ffi_provider =
FFI_TableProvider::new(provider, true, None, task_ctx_provider,
None);
+ ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
let foreign_table_provider: Arc<dyn TableProvider> =
(&ffi_provider).into();
@@ -712,4 +715,62 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn test_scan_with_none_projection_returns_all_columns() ->
Result<()> {
+ use arrow::datatypes::Field;
+ use datafusion::arrow::array::Float32Array;
+ use datafusion::arrow::datatypes::DataType;
+ use datafusion::arrow::record_batch::RecordBatch;
+ use datafusion::datasource::MemTable;
+ use datafusion::physical_plan::collect;
+
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Float32, false),
+ Field::new("b", DataType::Float32, false),
+ Field::new("c", DataType::Float32, false),
+ ]));
+
+ let batch = RecordBatch::try_new(
+ Arc::clone(&schema),
+ vec![
+ Arc::new(Float32Array::from(vec![1.0, 2.0])),
+ Arc::new(Float32Array::from(vec![3.0, 4.0])),
+ Arc::new(Float32Array::from(vec![5.0, 6.0])),
+ ],
+ )?;
+
+ let provider =
+ Arc::new(MemTable::try_new(Arc::clone(&schema),
vec![vec![batch]])?);
+
+ let ctx = Arc::new(SessionContext::new());
+ let task_ctx_provider = Arc::clone(&ctx) as Arc<dyn
TaskContextProvider>;
+ let task_ctx_provider =
FFI_TaskContextProvider::from(&task_ctx_provider);
+
+ // Wrap in FFI and force the foreign path (not local bypass)
+ let mut ffi_provider =
+ FFI_TableProvider::new(provider, true, None, task_ctx_provider,
None);
+ ffi_provider.library_marker_id = crate::mock_foreign_marker_id;
+
+ let foreign_table_provider: Arc<dyn TableProvider> =
(&ffi_provider).into();
+
+ // Call scan with projection=None, meaning "return all columns"
+ let plan = foreign_table_provider
+ .scan(&ctx.state(), None, &[], None)
+ .await?;
+ assert_eq!(
+ plan.schema().fields().len(),
+ 3,
+ "scan(projection=None) should return all columns; got {}",
+ plan.schema().fields().len()
+ );
+
+ // Also verify we can execute and get correct data
+ let batches = collect(plan, ctx.task_ctx()).await?;
+ assert_eq!(batches.len(), 1);
+ assert_eq!(batches[0].num_columns(), 3);
+ assert_eq!(batches[0].num_rows(), 2);
+
+ Ok(())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]