paleolimbot commented on code in PR #797:
URL: https://github.com/apache/sedona-db/pull/797#discussion_r3171822256
##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -579,37 +560,45 @@ impl FileSource for GeoParquetFileSource {
Arc::new(source)
}
- fn with_schema(&self, schema: TableSchema) -> Arc<dyn FileSource> {
- let mut source = Self::from_file_source(
- self.inner.with_schema(schema),
- self.metadata_size_hint,
- self.predicate.clone(),
- );
- source.options = self.options.clone();
- source.metadata_cache = self.metadata_cache.clone();
- Arc::new(source)
- }
-
- fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
- let mut source = Self::from_file_source(
- self.inner.with_projection(config),
- self.metadata_size_hint,
- self.predicate.clone(),
- );
- source.options = self.options.clone();
- source.metadata_cache = self.metadata_cache.clone();
- Arc::new(source)
+ fn try_pushdown_projection(
+ &self,
+ projection: &ProjectionExprs,
+ ) -> Result<Option<Arc<dyn FileSource>>> {
+ // DataFusion 52 has an issue where field metadata (like
ARROW:extension:name)
+ // is stripped when evaluating embedded projections in ParquetOpener.
This is
+ // because the batch schema comes from the parquet reader (which
doesn't have
+ // extension metadata), and Column::return_field() looks up fields
from that schema.
+ // This isn't a bug in DataFusion because we're the ones that
advertised the table
+ // schema as having metadata'd expressions in the first place.
+ //
+ // We fix this by wrapping Column expressions with
MetadataPreservingColumn,
+ // which stores the correct field from the file schema and returns it
from
+ // return_field() regardless of the input schema.
+ let transformed_projection = wrap_columns_with_metadata_preserving(
+ projection.clone(),
+ self.inner.table_schema().table_schema(),
+ )?;
Review Comment:
This is a somewhat involved change that Opus did a great job root causing
but very bad job solving and it took me some time to fix this. Because we
advertise a file schema that is not the underlying Parquet file schema, we have
expressions whose columns refer to a schema that the inner opener assumes is
identical to the file schema (but are not!). I hope we can remove this wrapper
at some point and/or simplify our Parquet wrapping strategy but for now this is
is a minimally invasive strategy that ensures we don't loose benefits of the
wrapped Parquet implementation.
##########
rust/sedona-datasource/src/format.rs:
##########
@@ -275,34 +294,32 @@ impl FileSource for ExternalFileSource {
.with_updated_node(Arc::new(source)))
}
- fn as_any(&self) -> &dyn Any {
- self
- }
-
- fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
- Arc::new(Self {
- batch_size: Some(batch_size),
+ fn try_pushdown_projection(
+ &self,
+ projection: &ProjectionExprs,
+ ) -> Result<Option<Arc<dyn FileSource>>> {
+ // Use SplitProjection to handle any projection:
+ // - file_indices provides column pruning (always works)
+ // - ProjectionOpener handles reordering/expressions/renames after
reading
+ let split_projection =
SplitProjection::new(self.table_schema.file_schema(), projection);
Review Comment:
This projection pushdown bit is a key change in DataFusion 52. For the
generic datasource (which we use for GDAL input) we can use the built-in
pattern for bridging this with the previous approach (simple integers).
##########
python/sedonadb/src/dataframe.rs:
##########
@@ -390,8 +391,14 @@ impl InternalDataFrame {
) -> Result<Bound<'py, PyCapsule>, PySedonaError> {
let name = cr"datafusion_table_provider".into();
let provider = self.inner.clone().into_view();
- let ffi_provider =
- FFI_TableProvider::new(provider, true,
Some(self.runtime.handle().clone()));
+ let ctx = Arc::new(SessionContext::new()) as Arc<dyn
TaskContextProvider>;
+ let ffi_provider = FFI_TableProvider::new(
+ provider,
+ true,
+ Some(self.runtime.handle().clone()),
+ &ctx,
+ None,
+ );
Review Comment:
A few FFI things changed...we don't really use this except currently for the
situation where there are multiple SessionContexts and we pass data frames
between them (and we should probably find a better way to do this).
##########
rust/sedona-geoparquet/src/format.rs:
##########
@@ -474,32 +481,6 @@ impl GeoParquetFileSource {
}
}
- /// Apply a [SchemaAdapterFactory] to the inner [ParquetSource]
- pub fn with_schema_adapter_factory(
- &self,
- schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
- ) -> Self {
Review Comment:
The schema adapter factory is no more (there is something called a physical
expression adapater factory that we may need to add support for later)
##########
rust/sedona-pointcloud/src/las/source.rs:
##########
@@ -81,26 +89,32 @@ impl FileSource for LasSource {
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
- ) -> Arc<dyn FileOpener> {
- let projection = base_config
- .file_column_projection_indices()
- .unwrap_or_else(||
(0..base_config.projected_file_schema().fields().len()).collect());
-
+ ) -> Result<Arc<dyn FileOpener>, DataFusionError> {
let file_reader_factory = self
.reader_factory
.clone()
.unwrap_or_else(||
Arc::new(LasFileReaderFactory::new(object_store, None)));
- Arc::new(LasOpener {
- projection: Arc::from(projection),
+ let inner_opener: Arc<dyn FileOpener> = Arc::new(LasOpener {
batch_size: self.batch_size.expect("Must be set"),
limit: base_config.limit,
predicate: self.predicate.clone(),
file_reader_factory,
options: self.options.clone(),
partition_count:
base_config.output_partitioning().partition_count(),
partition,
- })
+ });
+
+ // Wrap with ProjectionOpener to handle reordering/expressions
+ if let Some(split_projection) = &self.split_projection {
+ ProjectionOpener::try_new(
+ split_projection.clone(),
+ inner_opener,
+ self.table_schema.file_schema(),
+ )
+ } else {
+ Ok(inner_opener)
+ }
Review Comment:
cc @b4l...this is to keep up with the changes for DataFusion 52. The current
tests pass but I didn't add new ones with more complex projections to check
(there are some for the sedona-datasource that did trigger a failure and this
approach worked there to fix the issue).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]