hugokitano opened a new issue, #1202: URL: https://github.com/apache/iceberg-rust/issues/1202
### Apache Iceberg Rust version 0.4.0 (latest version) ### Describe the bug The apache rust map writer requires unique metadata fields for every Field. The complex type `map` contains multiple Fields that must have metadata assigned, yet upon writing a record batch, the metadata for the outer `Map` field is lost ### To Reproduce ``` #[tokio::main] async fn main() -> Result<(), Box<dyn std::error::Error>> { run_iceberg_write().await } async fn run_iceberg_write() -> Result<(), Box<dyn std::error::Error>> { use parquet::arrow::PARQUET_FIELD_ID_META_KEY; // Create an in-memory file IO let file_io = iceberg::io::FileIOBuilder::new("memory").build()?; let namespace_name = "test_namespace"; let table_name = "test_table"; // Create the key and value fields WITH metadata let mut key_metadata = HashMap::new(); key_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string()); let key_field = Field::new("key", arrow_schema::DataType::Int64, false) .with_metadata(key_metadata); let mut value_metadata = HashMap::new(); value_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "2".to_string()); let value_field = Field::new("value", arrow_schema::DataType::Float64, true) .with_metadata(value_metadata); // Create the key_value field with metadata let mut key_value_metadata = HashMap::new(); key_value_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "3".to_string()); let key_value_field = Field::new( "struct", arrow_schema::DataType::Struct(vec![ Arc::new(key_field.clone()), Arc::new(value_field.clone()), ].into()), false ).with_metadata(key_value_metadata.clone()); // Create the map field with metadata let mut map_metadata = HashMap::new(); map_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), "4".to_string()); let map_field = Field::new( "map", arrow_schema::DataType::Map(Arc::new(key_value_field.clone()), false), true ).with_metadata(map_metadata); // Create schema with the map field let schema = ArrowSchema::new(vec![map_field]); // Create test data for the map let map_keys = Int64Array::from(vec![101, 102, 201, 202]); let map_values = Float64Array::from(vec![Some(10.1), Some(10.2), Some(20.1), Some(20.2)]); // Create the struct array let map_struct_array = StructArray::new( arrow_schema::Fields::from(vec![ Arc::new(key_field.clone()), Arc::new(value_field.clone()), ]), vec![Arc::new(map_keys), Arc::new(map_values)], None ); // Define offsets for the map let map_offsets = OffsetBuffer::<i32>::new(arrow_buffer::ScalarBuffer::from(vec![0, 2, 4])); // Create the map array let map_array = arrow_array::MapArray::new( Arc::new(key_value_field.clone()), map_offsets, map_struct_array, None, false ); // Create record batch let batch = RecordBatch::try_new( Arc::new(schema.clone()), vec![Arc::new(map_array) as ArrayRef], )?; println!("batch with schema {}", batch.schema()); // Create an in-memory catalog let catalog = iceberg_catalog_memory::MemoryCatalog::new(file_io.clone(), Some("memory://warehouse".to_string())); // Create the namespace and table let namespace = NamespaceIdent::new(namespace_name.to_string()); catalog.create_namespace(&namespace, HashMap::new()).await?; let table_ident = TableIdent::new(namespace.clone(), table_name.to_string()); // Convert schema to Iceberg schema let iceberg_schema = iceberg::spec::Schema::try_from(&schema)?; let table = catalog.create_table(&namespace, TableCreation::builder() .name(table_name.to_string()) .schema(iceberg_schema.clone()) .properties(HashMap::new()) .build()).await?; // Create location generator let location_generator = DefaultLocationGenerator::new(table.metadata().clone())?; // Create file name generator let writer_id = format!("file-{}", Uuid::new_v4()); let file_name_generator = DefaultFileNameGenerator::new( writer_id, None, iceberg::spec::DataFileFormat::Parquet ); // Create Parquet writer builder let parquet_props = parquet::file::properties::WriterProperties::builder().build(); let parquet_writer_builder = ParquetWriterBuilder::new( parquet_props, Arc::new(iceberg_schema), file_io.clone(), location_generator, file_name_generator, ); // Create data file writer let data_file_writer_builder = DataFileWriterBuilder::new(parquet_writer_builder, None); let mut writer = data_file_writer_builder.build().await?; // Write the batch writer.write(batch).await?; // Close the writer and get data files let data_files = writer.close().await?; // Create a transaction to append data files let tx = Transaction::new(&table); let mut fast_append = tx.fast_append(None, vec![])?; fast_append.add_data_files(data_files)?; fast_append.apply().await?; println!("Successfully wrote table!"); Ok(()) } ``` produces the output ``` batch with schema Field { name: "map", data_type: Map(Field { name: "struct", data_type: Struct([Field { name: "key", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "4"} } Error: Unexpected => Failed to write using parquet writer. Source: Arrow: Incompatible type. Field 'map' has type Map(Field { name: "key_value", data_type: Struct([Field { name: "key", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, false), array has type Map(Field { name: "struct", data_type: Struct([Field { name: "key", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }]), nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} }, false) ``` Notice metadata value "4" is missing from the Map datatype in the error message, but is definitely present in the record batch. ### Expected behavior _No response_ ### Willingness to contribute None -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org