paleolimbot commented on code in PR #797:
URL: https://github.com/apache/sedona-db/pull/797#discussion_r3171822256


##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -579,37 +560,45 @@ impl FileSource for GeoParquetFileSource {
         Arc::new(source)
     }
 
-    fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
-        let mut source = Self::from_file_source(
-            self.inner.with_schema(schema),
-            self.metadata_size_hint,
-            self.predicate.clone(),
-        );
-        source.options = self.options.clone();
-        source.metadata_cache = self.metadata_cache.clone();
-        Arc::new(source)
-    }
-
-    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
-        let mut source = Self::from_file_source(
-            self.inner.with_projection(config),
-            self.metadata_size_hint,
-            self.predicate.clone(),
-        );
-        source.options = self.options.clone();
-        source.metadata_cache = self.metadata_cache.clone();
-        Arc::new(source)
+    fn try_pushdown_projection(
+        &self,
+        projection: &ProjectionExprs,
+    ) -> Result<Option<Arc<dyn FileSource>>> {
+        // DataFusion 52 has an issue where field metadata (like 
ARROW:extension:name)
+        // is stripped when evaluating embedded projections in ParquetOpener. 
This is
+        // because the batch schema comes from the parquet reader (which 
doesn't have
+        // extension metadata), and Column::return_field() looks up fields 
from that schema.
+        // This isn't a bug in DataFusion because we're the ones that 
advertised the table
+        // schema as having metadata'd expressions in the first place.
+        //
+        // We fix this by wrapping Column expressions with 
MetadataPreservingColumn,
+        // which stores the correct field from the file schema and returns it 
from
+        // return_field() regardless of the input schema.
+        let transformed_projection = wrap_columns_with_metadata_preserving(
+            projection.clone(),
+            self.inner.table_schema().table_schema(),
+        )?;

Review Comment:
   This is a somewhat involved change that Opus did a great job root causing 
but very bad job solving and it took me some time to fix this. Because we 
advertise a file schema that is not the underlying Parquet file schema, we have 
expressions whose columns refer to a schema that the inner opener assumes is 
identical to the file schema (but are not!). I hope we can remove this wrapper 
at some point and/or simplify our Parquet wrapping strategy but for now this is 
is a minimally invasive strategy that ensures we don't loose benefits of the 
wrapped Parquet implementation.



##########
rust/sedona-datasource/src/format.rs:
##########
@@ -275,34 +294,32 @@ impl FileSource for ExternalFileSource {
         .with_updated_node(Arc::new(source)))
     }
 
-    fn as_any(&self) -> &dyn Any {
-        self
-    }
-
-    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
-        Arc::new(Self {
-            batch_size: Some(batch_size),
+    fn try_pushdown_projection(
+        &self,
+        projection: &ProjectionExprs,
+    ) -> Result<Option<Arc<dyn FileSource>>> {
+        // Use SplitProjection to handle any projection:
+        // - file_indices provides column pruning (always works)
+        // - ProjectionOpener handles reordering/expressions/renames after 
reading
+        let split_projection = 
SplitProjection::new(self.table_schema.file_schema(), projection);

Review Comment:
   This projection pushdown bit is a key change in DataFusion 52. For the 
generic datasource (which we use for GDAL input) we can use the built-in 
pattern for bridging this with the previous approach (simple integers).



##########
python/sedonadb/src/dataframe.rs:
##########
@@ -390,8 +391,14 @@ impl InternalDataFrame {
     ) -> Result<Bound<'py, PyCapsule>, PySedonaError> {
         let name = cr"datafusion_table_provider".into();
         let provider = self.inner.clone().into_view();
-        let ffi_provider =
-            FFI_TableProvider::new(provider, true, 
Some(self.runtime.handle().clone()));
+        let ctx = Arc::new(SessionContext::new()) as Arc<dyn 
TaskContextProvider>;
+        let ffi_provider = FFI_TableProvider::new(
+            provider,
+            true,
+            Some(self.runtime.handle().clone()),
+            &ctx,
+            None,
+        );

Review Comment:
   A few FFI things changed...we don't really use this except currently for the 
situation where there are multiple SessionContexts and we pass data frames 
between them (and we should probably find a better way to do this).



##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -474,32 +481,6 @@ impl GeoParquetFileSource {
         }
     }
 
-    /// Apply a [SchemaAdapterFactory] to the inner [ParquetSource]
-    pub fn with_schema_adapter_factory(
-        &self,
-        schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
-    ) -> Self {

Review Comment:
   The schema adapter factory is no more (there is something called a physical 
expression adapater factory that we may need to add support for later)



##########
rust/sedona-pointcloud/src/las/source.rs:
##########
@@ -81,26 +89,32 @@ impl FileSource for LasSource {
         object_store: Arc<dyn ObjectStore>,
         base_config: &FileScanConfig,
         partition: usize,
-    ) -> Arc<dyn FileOpener> {
-        let projection = base_config
-            .file_column_projection_indices()
-            .unwrap_or_else(|| 
(0..base_config.projected_file_schema().fields().len()).collect());
-
+    ) -> Result<Arc<dyn FileOpener>, DataFusionError> {
         let file_reader_factory = self
             .reader_factory
             .clone()
             .unwrap_or_else(|| 
Arc::new(LasFileReaderFactory::new(object_store, None)));
 
-        Arc::new(LasOpener {
-            projection: Arc::from(projection),
+        let inner_opener: Arc<dyn FileOpener> = Arc::new(LasOpener {
             batch_size: self.batch_size.expect("Must be set"),
             limit: base_config.limit,
             predicate: self.predicate.clone(),
             file_reader_factory,
             options: self.options.clone(),
             partition_count: 
base_config.output_partitioning().partition_count(),
             partition,
-        })
+        });
+
+        // Wrap with ProjectionOpener to handle reordering/expressions
+        if let Some(split_projection) = &self.split_projection {
+            ProjectionOpener::try_new(
+                split_projection.clone(),
+                inner_opener,
+                self.table_schema.file_schema(),
+            )
+        } else {
+            Ok(inner_opener)
+        }

Review Comment:
   cc @b4l...this is to keep up with the changes for DataFusion 52. The current 
tests pass but I didn't add new ones with more complex projections to check 
(there are some for the sedona-datasource that did trigger a failure and this 
approach worked there to fix the issue).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to