sdd commented on issue #405: URL: https://github.com/apache/iceberg-rust/issues/405#issuecomment-2325981291
I'm tackling items 2 and 3 from @liurenjie1024's initial comment on this issue. I've got a design proposal that I'd like to share to get feedback on. It involves creating a post-processor that can transform `RecordBatch`es that come from the `ParquetRecordBatchStream` before we send them back to the caller. If we detect that schema migration requires us to promote / insert columns, then we create an instance of this post-processor and append it into the processing pipeline. If it is not needed, the pipeline remains unchanged from it's current configuration. The post-processor could look like this: ```rust use arrow_array::{Array as ArrowArray, ArrayRef as Arrow, RecordBatch}; use arrow_schema::{DataType, Schema as ArrowSchema}; use crate::spec::Schema as IcebergSchema; /// Represents an operation that may need to be performed /// to transform a RecordBatch coming from a Parquet file record /// batch stream to match a newer Iceberg schema that has evolved from /// the one that was used to write the parquet file. pub(crate) enum EvolutionOp { // signifies that a particular column has undergone type promotion, // thus the column with the given index needs to be promoted to the // specified type Promote { index: usize, target_type: DataType }, // Signifies that a new column has been inserted before the row // with index `index`. (we choose "before" rather than "after" so // that we can use a usize; if we insert after, then we need to // be able to store -1 here when we want to indicate that the new // column is to be added at the front of the list). // If multiple columns need to be inserted at a given // location, they should all be given the same index, as the index // here refers to the original record batch, not the interim state after // a preceding operation. Add { index: usize, target_type: DataType, value: Option<ArrayRef> // A Scalar }, // signifies that a column has been renamed from one schema to the next. // this requires no change to the data within a record batch, only to its // schema. Rename { index: usize, target_name: String, } // The iceberg spec refers to other permissible schema evolution actions // (see https://iceberg.apache.org/spec/#schema-evolution): // deleting fields and reordering fields. // However, these actions can be achieved without needing this // slower post-processing step by using the projection mask. } pub(crate) struct RecordBatchEvolutionProcessor { operations: Vec<EvolutionOp>, // Every transformed RecordBatch will have the same schema. We create the // target just once and cache it here. Helpfully, Arc<Schema> is needed in // the constructor for RecordBatch, so we don't need an expensive copy // each time. target_schema: OnceCell<Arc<ArrowSchema>> // Caching any columns that we need to add is harder as the number of rows // in the record batches can vary from batch to batch within the stream, // so rather than storing cached added columns here too, we have to // generate them on the fly. } impl RecordBatchEvolutionProcessor { /// returns None if no processor is required pub(crate) fn build(source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32]) -> Option<Self> { let operations: Vec<_> = Self::generate_operations( source_schema, snapshot_schema, projected_iceberg_field_ids ); if operations.is_empty() { None } else { Some(Self { operations, target_schema: OnceCell::default() }) } } pub(crate) fn process_stream(&self, _source_stream: impl Stream<Item=Result<RecordBatch>>) -> Result<ArrowRecordBatchStream> { todo!() } pub(crate) fn process_record_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> { let new_batch_schema = self.target_schema.get_or_init(|| self.create_target_schema()); RecordBatch::try_new( new_batch_schema.clone(), self.transform_columns(record_batch.columns()) )? } fn generate_operations(source_schema: &ArrowSchemaRef, snapshot_schema: &IcebergSchema, projected_iceberg_field_ids: &[i32]) -> Vec<EvolutionOp> { // create the (possibly empty) list of `EvolutionOp`s that we need // to apply to the arrays in a record batch with `source_schema` so // that it matches the `snapshot_schema` todo!(); } fn transform_columns(&self, columns: &[Arc<dyn ArrowArray>]) -> Vec<Arc<dyn ArrowArray>> { // iterate over source_columns and self.operations, // populating a Vec::with_capacity as we go todo!(); } fn create_target_schema(&self) -> Arc<ArrowSchema> { todo!(); } } ``` The iceberg `ArrowReader` would be modified thusly: ```rust impl ArrowReader { // ... async fn process_file_scan_task(...) { // ... // create a RecordBatchEvolutionProcessor if our task schema contains columns // not present in the parquet file or whose types have been promoted let record_batch_evolution_processor = RecordBatchEvolutionProcessor::build( record_batch_stream_builder.schema(), task.schema(), task.project_field_ids(), ); // ... // Build the batch stream and send all the RecordBatches that it generates // to the requester. let mut record_batch_stream: impl Stream<Item=Result<RecordBatch>> = record_batch_stream_builder.build()?; if let Some(record_batch_evolution_processor) = record_batch_evolution_processor { record_batch_stream = record_batch_evolution_processor.process_stream(record_batch_stream)?; } while let Some(batch) = record_batch_stream.try_next().await? { tx.send(Ok(batch)).await? } Ok(()) } // ... } @liurenjie1024, @Xuanwo: What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org