This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch branch-53
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-53 by this push:
     new 129757d74d [branch-53] Reattach parquet metadata cache after 
deserializing in datafusion-proto (#20574) (#20891)
129757d74d is described below

commit 129757d74ddff3c97f434cb5b0894d68e2713796
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Mar 12 11:49:22 2026 -0400

    [branch-53] Reattach parquet metadata cache after deserializing in 
datafusion-proto (#20574) (#20891)
    
    - Part of https://github.com/apache/datafusion/issues/19692
    - Closes https://github.com/apache/datafusion/issues/20575 on branch-53
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20574 from
    @nathanb9 to the branch-53 line
    
    Co-authored-by: nathan <[email protected]>
---
 datafusion/proto/src/physical_plan/mod.rs          | 20 ++++++--
 .../proto/tests/cases/roundtrip_physical_plan.rs   | 58 +++++++++++++++++++++-
 2 files changed, 73 insertions(+), 5 deletions(-)

diff --git a/datafusion/proto/src/physical_plan/mod.rs 
b/datafusion/proto/src/physical_plan/mod.rs
index 47fa1319c5..85406e31da 100644
--- a/datafusion/proto/src/physical_plan/mod.rs
+++ b/datafusion/proto/src/physical_plan/mod.rs
@@ -42,9 +42,13 @@ use datafusion_datasource_csv::source::CsvSource;
 use datafusion_datasource_json::file_format::JsonSink;
 use datafusion_datasource_json::source::JsonSource;
 #[cfg(feature = "parquet")]
+use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
+#[cfg(feature = "parquet")]
 use datafusion_datasource_parquet::file_format::ParquetSink;
 #[cfg(feature = "parquet")]
 use datafusion_datasource_parquet::source::ParquetSource;
+#[cfg(feature = "parquet")]
+use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_execution::{FunctionRegistry, TaskContext};
 use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
 use datafusion_functions_table::generate_series::{
@@ -848,9 +852,19 @@ impl protobuf::PhysicalPlanNode {
 
             // Parse table schema with partition columns
             let table_schema = parse_table_schema_from_proto(base_conf)?;
-
-            let mut source =
-                
ParquetSource::new(table_schema).with_table_parquet_options(options);
+            let object_store_url = match base_conf.object_store_url.is_empty() 
{
+                false => ObjectStoreUrl::parse(&base_conf.object_store_url)?,
+                true => ObjectStoreUrl::local_filesystem(),
+            };
+            let store = ctx.runtime_env().object_store(object_store_url)?;
+            let metadata_cache =
+                ctx.runtime_env().cache_manager.get_file_metadata_cache();
+            let reader_factory =
+                Arc::new(CachedParquetFileReaderFactory::new(store, 
metadata_cache));
+
+            let mut source = ParquetSource::new(table_schema)
+                .with_parquet_file_reader_factory(reader_factory)
+                .with_table_parquet_options(options);
 
             if let Some(predicate) = predicate {
                 source = source.with_predicate(predicate);
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 2b8c1056f3..66ca903e4e 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -36,8 +36,9 @@ use datafusion::datasource::listing::{
 };
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::datasource::physical_plan::{
-    ArrowSource, FileGroup, FileOutputMode, FileScanConfigBuilder, 
FileSinkConfig,
-    ParquetSource, wrap_partition_type_in_dict, wrap_partition_value_in_dict,
+    ArrowSource, FileGroup, FileOutputMode, FileScanConfig, 
FileScanConfigBuilder,
+    FileSinkConfig, ParquetSource, wrap_partition_type_in_dict,
+    wrap_partition_value_in_dict,
 };
 use datafusion::datasource::sink::DataSinkExec;
 use datafusion::datasource::source::DataSourceExec;
@@ -942,6 +943,59 @@ fn roundtrip_parquet_exec_with_pruning_predicate() -> 
Result<()> {
     roundtrip_test(DataSourceExec::from_data_source(scan_config))
 }
 
+#[test]
+fn roundtrip_parquet_exec_attaches_cached_reader_factory_after_roundtrip() -> 
Result<()> {
+    let file_schema =
+        Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)]));
+    let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema)));
+    let scan_config =
+        FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), 
file_source)
+            .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new(
+                "/path/to/file.parquet".to_string(),
+                1024,
+            )])])
+            .with_statistics(Statistics {
+                num_rows: Precision::Inexact(100),
+                total_byte_size: Precision::Inexact(1024),
+                column_statistics: Statistics::unknown_column(&file_schema),
+            })
+            .build();
+    let exec_plan = DataSourceExec::from_data_source(scan_config);
+
+    let ctx = SessionContext::new();
+    let codec = DefaultPhysicalExtensionCodec {};
+    let proto_converter = DefaultPhysicalProtoConverter {};
+    let roundtripped =
+        roundtrip_test_and_return(exec_plan, &ctx, &codec, &proto_converter)?;
+
+    let data_source = roundtripped
+        .as_any()
+        .downcast_ref::<DataSourceExec>()
+        .ok_or_else(|| {
+            internal_datafusion_err!("Expected DataSourceExec after roundtrip")
+        })?;
+    let file_scan = data_source
+        .data_source()
+        .as_any()
+        .downcast_ref::<FileScanConfig>()
+        .ok_or_else(|| {
+            internal_datafusion_err!("Expected FileScanConfig after roundtrip")
+        })?;
+    let parquet_source = file_scan
+        .file_source()
+        .as_any()
+        .downcast_ref::<ParquetSource>()
+        .ok_or_else(|| {
+            internal_datafusion_err!("Expected ParquetSource after roundtrip")
+        })?;
+
+    assert!(
+        parquet_source.parquet_file_reader_factory().is_some(),
+        "Parquet reader factory should be attached after decoding from 
protobuf"
+    );
+    Ok(())
+}
+
 #[test]
 fn roundtrip_arrow_scan() -> Result<()> {
     let file_schema =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to