sdd commented on code in PR #602: URL: https://github.com/apache/iceberg-rust/pull/602#discussion_r1744602811
########## crates/iceberg/src/arrow/record_batch_evolution_processor.rs: ########## @@ -0,0 +1,408 @@ +use std::sync::Arc; + +use arrow::compute::cast; +use arrow_array::{ + Array as ArrowArray, ArrayRef, Datum as ArrowDatum, Float32Array, Float64Array, Int32Array, + Int64Array, RecordBatch, StringArray, +}; +use arrow_schema::{DataType, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef}; +use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + +use crate::arrow::schema_to_arrow_schema; +use crate::spec::{Literal, PrimitiveLiteral, Schema as IcebergSchema}; +use crate::{Error, ErrorKind, Result}; + +/// Represents an operation that may need to be performed +/// to transform a RecordBatch coming from a Parquet file record +/// batch stream to match a newer Iceberg schema that has evolved from +/// the one that was used to write the parquet file. +#[derive(Debug)] +pub(crate) struct EvolutionOp { + index: usize, + action: EvolutionAction, +} + +#[derive(Debug)] +pub(crate) enum EvolutionAction { + // signifies that a particular column has undergone type promotion, + // thus the column with the given index needs to be promoted to the + // specified type + Promote { + target_type: DataType, + }, + + // Signifies that a new column has been inserted before the row + // with index `index`. (we choose "before" rather than "after" so + // that we can use usize; if we insert after, then we need to + // be able to store -1 here when we want to indicate that the new + // column is to be added at the front of the list). + // If multiple columns need to be inserted at a given + // location, they should all be given the same index, as the index + // here refers to the original record batch, not the interim state after + // a preceding operation. + Add { + target_type: DataType, + value: Option<PrimitiveLiteral>, + }, + // The iceberg spec refers to other permissible schema evolution actions + // (see https://iceberg.apache.org/spec/#schema-evolution): + // renaming fields, deleting fields and reordering fields. + // Renames only affect the RecordBatch schema rather than the + // columns themselves, so a single updated cached schema can + // be re-used and no per-column actions are required. + // Deletion and Reorder can be achieved without needing this + // post-processing step by using the projection mask. Review Comment: TODO: the projection mask can't be used to modify the column order in the record batches. If we want the returned column order to match the order of the iceberg field_ids in the scan, we'll need to do that in `RecordBatchEvolutionProcessor` too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org