Copilot commented on code in PR #805:
URL: https://github.com/apache/sedona-db/pull/805#discussion_r3192592001
##########
rust/sedona-geoparquet/src/writer.rs:
##########
@@ -532,8 +574,184 @@ fn append_float_bbox(
Ok(())
}
+#[derive(Debug, PartialEq)]
+struct NormalizeForGeoParquet {
+ crs_provider: CrsProviderOption,
+ version: GeoParquetVersion,
+ signature: Signature,
+}
+
+impl NormalizeForGeoParquet {
+ fn new(crs_provider: CrsProviderOption, version: GeoParquetVersion) ->
Self {
+ Self {
+ crs_provider,
+ version,
+ signature: Signature::any(1, Volatility::Stable),
+ }
+ }
+}
+
+impl std::hash::Hash for NormalizeForGeoParquet {
+ fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+ self.version.hash(state);
+ self.signature.hash(state);
+ }
+}
+
+impl Eq for NormalizeForGeoParquet {}
+
+impl ScalarUDFImpl for NormalizeForGeoParquet {
+ fn as_any(&self) -> &dyn Any {
+ self
+ }
+
+ fn name(&self) -> &str {
+ "normalize_for_geoparquet"
+ }
+
+ fn signature(&self) -> &Signature {
+ &self.signature
+ }
+
+ fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
+ sedona_internal_err!("return_type() should not be called")
+ }
+
+ fn return_field_from_args(&self, args: datafusion_expr::ReturnFieldArgs)
-> Result<FieldRef> {
+ normalize_field_for_geoparquet(&args.arg_fields[0], self.version,
&self.crs_provider)
+ }
+
+ fn invoke_with_args(&self, args: datafusion_expr::ScalarFunctionArgs) ->
Result<ColumnarValue> {
+ Ok(args.args[0].clone())
+ }
+}
+
+fn normalize_field_for_geoparquet(
+ field: &FieldRef,
+ version: GeoParquetVersion,
+ crs_provider: &CrsProviderOption,
+) -> Result<FieldRef> {
+ if field.metadata().is_empty() {
+ return Ok(field.clone());
+ }
+
+ let sedona_type = SedonaType::from_storage_field(field)?;
+ match sedona_type {
+ SedonaType::Arrow(DataType::Struct(children)) => {
+ let new_type = DataType::Struct(
+ children
+ .iter()
+ .map(|f| normalize_field_for_geoparquet(f, version,
crs_provider))
+ .collect::<Result<_>>()?,
+ );
+ Ok(Arc::new(field.as_ref().clone().with_data_type(new_type)))
+ }
+ SedonaType::Arrow(DataType::List(child)) => {
+ let new_type = DataType::List(normalize_field_for_geoparquet(
+ &child,
+ version,
+ crs_provider,
+ )?);
+ Ok(Arc::new(field.as_ref().clone().with_data_type(new_type)))
+ }
Review Comment:
`normalize_field_for_geoparquet()` returns early when the *current* field
has empty metadata, which prevents normalization of nested child fields (e.g.,
a Struct/List field with empty metadata but children that carry GeoArrow
metadata). This breaks cases like Sedona's item-level CRS wrapper (a Struct
with an `item` field that can be geo) and can cause GeoArrow metadata to leak
into GeoParquet 1.0/1.1 outputs or skip CRS normalization/workarounds for
2.0/none. Consider removing the early return or recursing for Struct/List
regardless of the parent field metadata (only skipping work once you’ve
confirmed there is no nested metadata to normalize).
##########
rust/sedona-geoparquet/src/writer.rs:
##########
@@ -246,31 +251,27 @@ impl DataSink for GeoParquetSink {
data: SendableRecordBatchStream,
context: &Arc<TaskContext>,
) -> Result<u64> {
- if let Some(projection) = &self.projection {
- // If we have a projection, apply it here
- let schema = self.parquet_output_schema.clone();
- let projection = projection.clone();
-
- let data = Box::pin(RecordBatchStreamAdapter::new(
- schema.clone(),
- data.map(move |batch_result| {
- let schema = schema.clone();
-
- batch_result.and_then(|batch| {
- let mut columns = Vec::with_capacity(projection.len());
- for (expr, _) in &projection {
- let col = expr.evaluate(&batch)?;
- columns.push(col.into_array(batch.num_rows())?);
- }
- Ok(RecordBatch::try_new(schema.clone(), columns)?)
- })
- }),
- ));
+ // Apply the projection and write
+ let schema = self.parquet_output_schema.clone();
+ let projection = self.projection.clone();
+
+ let data = Box::pin(RecordBatchStreamAdapter::new(
+ schema.clone(),
+ data.map(move |batch_result| {
+ let schema = schema.clone();
+
+ batch_result.and_then(|batch| {
+ let mut columns = Vec::with_capacity(projection.len());
+ for (expr, _) in &projection {
+ let col = expr.evaluate(&batch)?;
+ columns.push(col.into_array(batch.num_rows())?);
+ }
+ Ok(RecordBatch::try_new(schema.clone(), columns)?)
+ })
+ }),
+ ));
Review Comment:
`GeoParquetSink::write_all()` always rebuilds a new `RecordBatch` by
evaluating the full `projection`, even when the projection is effectively
identity (e.g., V2.0/none when fields already have the desired metadata and no
bbox columns were added). This adds avoidable per-batch CPU/memory overhead.
Consider detecting an identity projection and delegating directly to
`self.inner.write_all(data, ...)` in that case.
##########
rust/sedona/src/context.rs:
##########
@@ -188,7 +188,22 @@ impl SedonaContext {
state_builder = state_builder.with_query_planner(Arc::new(planner));
let mut state = state_builder.build();
+
+ // Register GeoParquet and try to initialize our statistics
accumulator. It is OK if this fails
+ // because we already registered it, but we propagate other errors for
safety.
state.register_file_format(Arc::new(GeoParquetFormatFactory::new()),
true)?;
+ let init_result =
+
sedona_geoparquet::statistics_accumulator::SedonaGeoStatsAccumulatorFactory::try_init();
+ if let Err(init_err) = init_result {
+ if !matches!(init_err, DataFusionError::ParquetError(_))
+ || !init_err
+ .to_string()
+ .contains("GeoStatsAccumulatorFactory already set")
+ {
+ return Err(init_err);
+ }
+ }
Review Comment:
The GeoStats accumulator initialization failure check relies on matching a
substring in `init_err.to_string()` ("GeoStatsAccumulatorFactory already set").
This is brittle across parquet/datafusion versions and can cause context
construction to fail unexpectedly if the error wording changes. Prefer matching
the concrete error variant/details (e.g., ParquetError kind) or exposing/using
an API that reports "already initialized" without string parsing.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]