blackmwk commented on code in PR #2477: URL: https://github.com/apache/iceberg-rust/pull/2477#discussion_r3420251999
########## crates/iceberg/src/arrow/timestamp_tz.rs: ########## @@ -0,0 +1,537 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! UTC timestamp coercion for Arrow RecordBatches. +//! +//! Arrow engines may produce timestamps with timezone "UTC" while Iceberg's +//! canonical Arrow schema uses "+00:00". This module handles the lossless cast +//! between UTC-equivalent timezone representations so the parquet writer can +//! accept data from either convention. +//! +//! Uses [`ArrowSchemaVisitor`] to walk the source batch schema and produce a +//! coerced schema where UTC-equivalent timezones are normalized to match the +//! target. This follows the same pattern as [`crate::arrow::int96`]. + +use std::sync::Arc; + +use arrow_array::RecordBatch; +use arrow_cast::cast; +use arrow_schema::{ + DataType, Field, FieldRef, Fields, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, +}; + +use crate::arrow::schema::{ArrowSchemaVisitor, DEFAULT_MAP_FIELD_NAME, visit_schema}; +use crate::{Error, ErrorKind, Result}; + +/// Coerce timestamp columns in `batch` to match `target_schema` when the only +/// difference is a UTC-equivalent timezone alias (e.g. "UTC" vs "+00:00"). +pub(crate) fn coerce_timestamp_columns( + batch: &RecordBatch, + target_schema: &ArrowSchemaRef, +) -> Result<RecordBatch> { + if batch.schema() == *target_schema { + return Ok(batch.clone()); + } + + let mut visitor = TimestampTzCoercionVisitor::new(target_schema); + let coerced_schema = Arc::new(visit_schema(&batch.schema(), &mut visitor)?); + + if !visitor.changed { + return Ok(batch.clone()); + } + + let mut cols = batch.columns().to_vec(); + for (idx, (col, target_field)) in cols.clone().iter().zip(coerced_schema.fields()).enumerate() { + if col.data_type() != target_field.data_type() { + cols[idx] = cast(col, target_field.data_type())?; + } + } + + RecordBatch::try_new(coerced_schema, cols).map_err(|err| { + Error::new( + ErrorKind::DataInvalid, + "Failed to rebuild record batch after casting to target schema.", + ) + .with_source(err) + }) +} + +/// Visitor that walks the source (batch) schema and produces a coerced schema +/// where UTC-equivalent timestamp timezones are normalized to match the target. +/// +/// For each primitive field, if the source has `Timestamp(unit, Some(tz))` and the +/// target has the same unit but a different UTC-equivalent timezone, we output +/// the target's timezone in the coerced schema. +struct TimestampTzCoercionVisitor<'a> { + target_schema: &'a ArrowSchemaRef, + field_stack: Vec<FieldRef>, + target_field_stack: Vec<DataType>, + changed: bool, +} + +impl<'a> TimestampTzCoercionVisitor<'a> { + fn new(target_schema: &'a ArrowSchemaRef) -> Self { + Self { + target_schema, + field_stack: Vec::new(), + target_field_stack: Vec::new(), + changed: false, + } + } + + fn current_target_type(&self) -> Option<&DataType> { + self.target_field_stack.last() + } +} + +impl ArrowSchemaVisitor for TimestampTzCoercionVisitor<'_> { Review Comment: There are two problems to me for this implements: 1. It's quite inefficient. It has a lot of useless clone of data type, and field. 2. Also I don't think we should silently ignore schema difference except timezone difference. I just learned from this pr that arrow supports casting from struct to struct, then we could simplify it with a much more efficient algorithm: 1. Using [filter_leaves](https://docs.rs/arrow/latest/arrow/datatypes/struct.Fields.html#method.filter_leaves) to filter fields in target schema where the leaves are utc column. 2. For each incoming record batch, if source_schema == target_schema, continue. 3. If it's not, we check the filtered fields in step 1 in source_schema to see if we need to do conversion. If so, we do a cast. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
