This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-python.git
The following commit(s) were added to refs/heads/main by this push:
new 21990b0b feat: Add FFI_TableProviderFactory support (#1396)
21990b0b is described below
commit 21990b0bb01599fb67dbd8686c907e5f810aace3
Author: Paul J. Davis <[email protected]>
AuthorDate: Mon Mar 16 07:07:50 2026 -0500
feat: Add FFI_TableProviderFactory support (#1396)
* feat: Add FFI_TableProviderFactory support
This wraps the new FFI_TableProviderFactory APIs in datafusion-ffi.
* Address PR comments
* Add support for Python based TableProviderFactory
This adds the ability to register Python based TableProviderFactory
instances to the SessionContext.
* Correction after rebase
---------
Co-authored-by: Tim Saucer <[email protected]>
---
crates/core/src/context.rs | 42 ++++++++++-
crates/core/src/table.rs | 59 ++++++++++++++-
.../python/tests/_test_table_provider_factory.py | 41 ++++++++++
examples/datafusion-ffi-example/src/lib.rs | 3 +
.../src/table_provider_factory.rs | 87 ++++++++++++++++++++++
python/datafusion/catalog.py | 19 +++++
python/datafusion/context.py | 18 +++++
python/tests/test_catalog.py | 27 +++++++
8 files changed, 291 insertions(+), 5 deletions(-)
diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs
index 709fdc5a..200b6470 100644
--- a/crates/core/src/context.rs
+++ b/crates/core/src/context.rs
@@ -27,7 +27,7 @@ use arrow::pyarrow::FromPyArrow;
use datafusion::arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::arrow::record_batch::RecordBatch;
-use datafusion::catalog::{CatalogProvider, CatalogProviderList};
+use datafusion::catalog::{CatalogProvider, CatalogProviderList,
TableProviderFactory};
use datafusion::common::{ScalarValue, TableReference, exec_err};
use
datafusion::datasource::file_format::file_compression_type::FileCompressionType;
use datafusion::datasource::file_format::parquet::ParquetFormat;
@@ -51,6 +51,7 @@ use datafusion_ffi::catalog_provider::FFI_CatalogProvider;
use datafusion_ffi::catalog_provider_list::FFI_CatalogProviderList;
use datafusion_ffi::execution::FFI_TaskContextProvider;
use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
use datafusion_python_util::{
create_logical_extension_capsule, ffi_logical_codec_from_pycapsule,
get_global_ctx,
@@ -81,7 +82,7 @@ use crate::record_batch::PyRecordBatchStream;
use crate::sql::logical::PyLogicalPlan;
use crate::sql::util::replace_placeholders_with_strings;
use crate::store::StorageContexts;
-use crate::table::PyTable;
+use crate::table::{PyTable, RustWrappedPyTableProviderFactory};
use crate::udaf::PyAggregateUDF;
use crate::udf::PyScalarUDF;
use crate::udtf::PyTableFunction;
@@ -659,6 +660,43 @@ impl PySessionContext {
Ok(())
}
+ pub fn register_table_factory(
+ &self,
+ format: &str,
+ mut factory: Bound<'_, PyAny>,
+ ) -> PyDataFusionResult<()> {
+ if factory.hasattr("__datafusion_table_provider_factory__")? {
+ let py = factory.py();
+ let codec_capsule = create_logical_extension_capsule(py,
self.logical_codec.as_ref())?;
+ factory = factory
+ .getattr("__datafusion_table_provider_factory__")?
+ .call1((codec_capsule,))?;
+ }
+
+ let factory: Arc<dyn TableProviderFactory> =
+ if let Ok(capsule) =
factory.cast::<PyCapsule>().map_err(py_datafusion_err) {
+ validate_pycapsule(capsule,
"datafusion_table_provider_factory")?;
+
+ let data: NonNull<FFI_TableProviderFactory> = capsule
+
.pointer_checked(Some(c_str!("datafusion_table_provider_factory")))?
+ .cast();
+ let factory = unsafe { data.as_ref() };
+ factory.into()
+ } else {
+ Arc::new(RustWrappedPyTableProviderFactory::new(
+ factory.into(),
+ self.logical_codec.clone(),
+ ))
+ };
+
+ let st = self.ctx.state_ref();
+ let mut lock = st.write();
+ lock.table_factories_mut()
+ .insert(format.to_owned(), factory);
+
+ Ok(())
+ }
+
pub fn register_catalog_provider_list(
&self,
mut provider: Bound<PyAny>,
diff --git a/crates/core/src/table.rs b/crates/core/src/table.rs
index 3dfe3e9c..62334977 100644
--- a/crates/core/src/table.rs
+++ b/crates/core/src/table.rs
@@ -21,19 +21,24 @@ use std::sync::Arc;
use arrow::datatypes::SchemaRef;
use arrow::pyarrow::ToPyArrow;
use async_trait::async_trait;
-use datafusion::catalog::Session;
+use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::common::Column;
use datafusion::datasource::{TableProvider, TableType};
-use datafusion::logical_expr::{Expr, LogicalPlanBuilder,
TableProviderFilterPushDown};
+use datafusion::logical_expr::{
+ CreateExternalTable, Expr, LogicalPlanBuilder, TableProviderFilterPushDown,
+};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::DataFrame;
-use datafusion_python_util::table_provider_from_pycapsule;
+use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
+use datafusion_python_util::{create_logical_extension_capsule,
table_provider_from_pycapsule};
use pyo3::IntoPyObjectExt;
use pyo3::prelude::*;
use crate::context::PySessionContext;
use crate::dataframe::PyDataFrame;
use crate::dataset::Dataset;
+use crate::errors;
+use crate::expr::create_external_table::PyCreateExternalTable;
/// This struct is used as a common method for all TableProviders,
/// whether they refer to an FFI provider, an internally known
@@ -206,3 +211,51 @@ impl TableProvider for TempViewTable {
Ok(vec![TableProviderFilterPushDown::Exact; filters.len()])
}
}
+
+#[derive(Debug)]
+pub(crate) struct RustWrappedPyTableProviderFactory {
+ pub(crate) table_provider_factory: Py<PyAny>,
+ pub(crate) codec: Arc<FFI_LogicalExtensionCodec>,
+}
+
+impl RustWrappedPyTableProviderFactory {
+ pub fn new(table_provider_factory: Py<PyAny>, codec:
Arc<FFI_LogicalExtensionCodec>) -> Self {
+ Self {
+ table_provider_factory,
+ codec,
+ }
+ }
+
+ fn create_inner(
+ &self,
+ cmd: CreateExternalTable,
+ codec: Bound<PyAny>,
+ ) -> PyResult<Arc<dyn TableProvider>> {
+ Python::attach(|py| {
+ let provider = self.table_provider_factory.bind(py);
+ let cmd = PyCreateExternalTable::from(cmd);
+
+ provider
+ .call_method1("create", (cmd,))
+ .and_then(|t| PyTable::new(t, Some(codec)))
+ .map(|t| t.table())
+ })
+ }
+}
+
+#[async_trait]
+impl TableProviderFactory for RustWrappedPyTableProviderFactory {
+ async fn create(
+ &self,
+ _: &dyn Session,
+ cmd: &CreateExternalTable,
+ ) -> datafusion::common::Result<Arc<dyn TableProvider>> {
+ Python::attach(|py| {
+ let codec = create_logical_extension_capsule(py,
self.codec.as_ref())
+ .map_err(errors::to_datafusion_err)?;
+
+ self.create_inner(cmd.clone(), codec.into_any())
+ .map_err(errors::to_datafusion_err)
+ })
+ }
+}
diff --git
a/examples/datafusion-ffi-example/python/tests/_test_table_provider_factory.py
b/examples/datafusion-ffi-example/python/tests/_test_table_provider_factory.py
new file mode 100644
index 00000000..b1e94ec7
--- /dev/null
+++
b/examples/datafusion-ffi-example/python/tests/_test_table_provider_factory.py
@@ -0,0 +1,41 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import annotations
+
+from datafusion import SessionContext
+from datafusion_ffi_example import MyTableProviderFactory
+
+
+def test_table_provider_factory_ffi() -> None:
+ ctx = SessionContext()
+ table = MyTableProviderFactory()
+
+ ctx.register_table_factory("MY_FORMAT", table)
+
+ # Create a new external table
+ ctx.sql("""
+ CREATE EXTERNAL TABLE
+ foo
+ STORED AS my_format
+ LOCATION '';
+ """).collect()
+
+ # Query the pre-populated table
+ result = ctx.sql("SELECT * FROM foo;").collect()
+ assert len(result) == 2
+ assert result[0].num_columns == 2
diff --git a/examples/datafusion-ffi-example/src/lib.rs
b/examples/datafusion-ffi-example/src/lib.rs
index 3627c149..68120a4c 100644
--- a/examples/datafusion-ffi-example/src/lib.rs
+++ b/examples/datafusion-ffi-example/src/lib.rs
@@ -22,6 +22,7 @@ use crate::catalog_provider::{FixedSchemaProvider,
MyCatalogProvider, MyCatalogP
use crate::scalar_udf::IsNullUDF;
use crate::table_function::MyTableFunction;
use crate::table_provider::MyTableProvider;
+use crate::table_provider_factory::MyTableProviderFactory;
use crate::window_udf::MyRankUDF;
pub(crate) mod aggregate_udf;
@@ -29,6 +30,7 @@ pub(crate) mod catalog_provider;
pub(crate) mod scalar_udf;
pub(crate) mod table_function;
pub(crate) mod table_provider;
+pub(crate) mod table_provider_factory;
pub(crate) mod window_udf;
#[pymodule]
@@ -36,6 +38,7 @@ fn datafusion_ffi_example(m: &Bound<'_, PyModule>) ->
PyResult<()> {
pyo3_log::init();
m.add_class::<MyTableProvider>()?;
+ m.add_class::<MyTableProviderFactory>()?;
m.add_class::<MyTableFunction>()?;
m.add_class::<MyCatalogProvider>()?;
m.add_class::<MyCatalogProviderList>()?;
diff --git a/examples/datafusion-ffi-example/src/table_provider_factory.rs
b/examples/datafusion-ffi-example/src/table_provider_factory.rs
new file mode 100644
index 00000000..53248a90
--- /dev/null
+++ b/examples/datafusion-ffi-example/src/table_provider_factory.rs
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use datafusion_catalog::{Session, TableProvider, TableProviderFactory};
+use datafusion_common::error::Result as DataFusionResult;
+use datafusion_expr::CreateExternalTable;
+use datafusion_ffi::table_provider_factory::FFI_TableProviderFactory;
+use datafusion_python_util::ffi_logical_codec_from_pycapsule;
+use pyo3::types::PyCapsule;
+use pyo3::{Bound, PyAny, PyResult, Python, pyclass, pymethods};
+
+use crate::catalog_provider;
+
+#[derive(Debug)]
+pub(crate) struct ExampleTableProviderFactory {}
+
+impl ExampleTableProviderFactory {
+ fn new() -> Self {
+ Self {}
+ }
+}
+
+#[async_trait]
+impl TableProviderFactory for ExampleTableProviderFactory {
+ async fn create(
+ &self,
+ _state: &dyn Session,
+ _cmd: &CreateExternalTable,
+ ) -> DataFusionResult<Arc<dyn TableProvider>> {
+ Ok(catalog_provider::my_table())
+ }
+}
+
+#[pyclass(
+ name = "MyTableProviderFactory",
+ module = "datafusion_ffi_example",
+ subclass
+)]
+#[derive(Debug)]
+pub struct MyTableProviderFactory {
+ inner: Arc<ExampleTableProviderFactory>,
+}
+
+impl Default for MyTableProviderFactory {
+ fn default() -> Self {
+ let inner = Arc::new(ExampleTableProviderFactory::new());
+ Self { inner }
+ }
+}
+
+#[pymethods]
+impl MyTableProviderFactory {
+ #[new]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ pub fn __datafusion_table_provider_factory__<'py>(
+ &self,
+ py: Python<'py>,
+ codec: Bound<PyAny>,
+ ) -> PyResult<Bound<'py, PyCapsule>> {
+ let name = cr"datafusion_table_provider_factory".into();
+ let codec = ffi_logical_codec_from_pycapsule(codec)?;
+ let factory = Arc::clone(&self.inner) as Arc<dyn TableProviderFactory
+ Send>;
+ let factory = FFI_TableProviderFactory::new_with_ffi_codec(factory,
None, codec);
+
+ PyCapsule::new(py, factory, Some(name))
+ }
+}
diff --git a/python/datafusion/catalog.py b/python/datafusion/catalog.py
index bc43cf34..03c0ddc6 100644
--- a/python/datafusion/catalog.py
+++ b/python/datafusion/catalog.py
@@ -29,6 +29,7 @@ if TYPE_CHECKING:
from datafusion import DataFrame, SessionContext
from datafusion.context import TableProviderExportable
+ from datafusion.expr import CreateExternalTable
try:
from warnings import deprecated # Python 3.13+
@@ -243,6 +244,24 @@ class Table:
return self._inner.kind
+class TableProviderFactory(ABC):
+ """Abstract class for defining a Python based Table Provider Factory."""
+
+ @abstractmethod
+ def create(self, cmd: CreateExternalTable) -> Table:
+ """Create a table using the :class:`CreateExternalTable`."""
+ ...
+
+
+class TableProviderFactoryExportable(Protocol):
+ """Type hint for object that has __datafusion_table_provider_factory__
PyCapsule.
+
+
https://docs.rs/datafusion/latest/datafusion/catalog/trait.TableProviderFactory.html
+ """
+
+ def __datafusion_table_provider_factory__(self, session: Any) -> object:
...
+
+
class CatalogProviderList(ABC):
"""Abstract class for defining a Python based Catalog Provider List."""
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 0d825977..ba9290a5 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -37,6 +37,8 @@ from datafusion.catalog import (
CatalogProviderExportable,
CatalogProviderList,
CatalogProviderListExportable,
+ TableProviderFactory,
+ TableProviderFactoryExportable,
)
from datafusion.dataframe import DataFrame
from datafusion.expr import sort_list_to_raw_sort_list
@@ -830,6 +832,22 @@ class SessionContext:
"""Remove a table from the session."""
self.ctx.deregister_table(name)
+ def register_table_factory(
+ self,
+ format: str,
+ factory: TableProviderFactory | TableProviderFactoryExportable,
+ ) -> None:
+ """Register a :py:class:`~datafusion.TableProviderFactoryExportable`.
+
+ The registered factory can be referenced from SQL DDL statements
executed
+ against this context.
+
+ Args:
+ format: The value to be used in `STORED AS ${format}` clause.
+ factory: A PyCapsule that implements
:class:`TableProviderFactoryExportable`
+ """
+ self.ctx.register_table_factory(format, factory)
+
def catalog_names(self) -> set[str]:
"""Returns the list of catalogs in this context."""
return self.ctx.catalog_names()
diff --git a/python/tests/test_catalog.py b/python/tests/test_catalog.py
index 9310da50..c89da36b 100644
--- a/python/tests/test_catalog.py
+++ b/python/tests/test_catalog.py
@@ -120,6 +120,12 @@ class
CustomCatalogProviderList(dfn.catalog.CatalogProviderList):
self.catalogs[name] = catalog
+class CustomTableProviderFactory(dfn.catalog.TableProviderFactory):
+ def create(self, cmd: dfn.expr.CreateExternalTable):
+ assert cmd.name() == "test_table_factory"
+ return create_dataset()
+
+
def test_python_catalog_provider_list(ctx: SessionContext):
ctx.register_catalog_provider_list(CustomCatalogProviderList())
@@ -314,3 +320,24 @@ def test_register_python_function_as_udtf(ctx:
SessionContext):
assert len(result[0]) == 1
assert len(result[0][0]) == 1
assert result[0][0][0].as_py() == 3
+
+
+def test_register_python_table_provider_factory(ctx: SessionContext):
+ ctx.register_table_factory("CUSTOM_FACTORY", CustomTableProviderFactory())
+
+ ctx.sql("""
+ CREATE EXTERNAL TABLE test_table_factory
+ STORED AS CUSTOM_FACTORY
+ LOCATION foo;
+ """).collect()
+
+ result = ctx.sql("SELECT * FROM test_table_factory;").collect()
+
+ expect = [
+ pa.RecordBatch.from_arrays(
+ [pa.array([1, 2, 3]), pa.array([4, 5, 6])],
+ names=["a", "b"],
+ )
+ ]
+
+ assert result == expect
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]