hugokitano opened a new issue, #1202:
URL: https://github.com/apache/iceberg-rust/issues/1202

   ### Apache Iceberg Rust version
   
   0.4.0 (latest version)
   
   ### Describe the bug
   
   The apache rust map writer requires unique metadata fields for every Field. 
The complex type `map` contains multiple Fields that must have metadata 
assigned, yet upon writing a record batch, the metadata for the outer `Map` 
field is lost 
   
   ### To Reproduce
   
   ```
   #[tokio::main]
   async fn main() -> Result<(), Box<dyn std::error::Error>> {
       run_iceberg_write().await
   }
   
   async fn run_iceberg_write() -> Result<(), Box<dyn std::error::Error>> {
       use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
       // Create an in-memory file IO
       let file_io = iceberg::io::FileIOBuilder::new("memory").build()?;
       let namespace_name = "test_namespace";
       let table_name = "test_table";
   
       // Create the key and value fields WITH metadata
       let mut key_metadata = HashMap::new();
       key_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), 
"1".to_string());
       let key_field = Field::new("key", arrow_schema::DataType::Int64, false)
           .with_metadata(key_metadata);
   
       let mut value_metadata = HashMap::new();
       value_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), 
"2".to_string());
       let value_field = Field::new("value", arrow_schema::DataType::Float64, 
true)
           .with_metadata(value_metadata);
   
       // Create the key_value field with metadata
       let mut key_value_metadata = HashMap::new();
       key_value_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), 
"3".to_string());
       let key_value_field = Field::new(
           "struct",
           arrow_schema::DataType::Struct(vec![
               Arc::new(key_field.clone()),
               Arc::new(value_field.clone()),
           ].into()),
           false
       ).with_metadata(key_value_metadata.clone());
   
       // Create the map field with metadata
       let mut map_metadata = HashMap::new();
       map_metadata.insert(PARQUET_FIELD_ID_META_KEY.to_string(), 
"4".to_string());
       let map_field = Field::new(
           "map",
           arrow_schema::DataType::Map(Arc::new(key_value_field.clone()), 
false),
           true
       ).with_metadata(map_metadata);
   
       // Create schema with the map field
       let schema = ArrowSchema::new(vec![map_field]);
   
       // Create test data for the map
       let map_keys = Int64Array::from(vec![101, 102, 201, 202]);
       let map_values = Float64Array::from(vec![Some(10.1), Some(10.2), 
Some(20.1), Some(20.2)]);
   
       // Create the struct array
       let map_struct_array = StructArray::new(
           arrow_schema::Fields::from(vec![
               Arc::new(key_field.clone()),
               Arc::new(value_field.clone()),
           ]),
           vec![Arc::new(map_keys), Arc::new(map_values)],
           None
       );
   
       // Define offsets for the map
       let map_offsets = 
OffsetBuffer::<i32>::new(arrow_buffer::ScalarBuffer::from(vec![0, 2, 4]));
   
       // Create the map array
       let map_array = arrow_array::MapArray::new(
           Arc::new(key_value_field.clone()),
           map_offsets,
           map_struct_array,
           None,
           false
       );
   
       // Create record batch
       let batch = RecordBatch::try_new(
           Arc::new(schema.clone()),
           vec![Arc::new(map_array) as ArrayRef],
       )?;
       println!("batch with schema {}", batch.schema());
   
       // Create an in-memory catalog
       let catalog = 
iceberg_catalog_memory::MemoryCatalog::new(file_io.clone(), 
Some("memory://warehouse".to_string()));
   
       // Create the namespace and table
       let namespace = NamespaceIdent::new(namespace_name.to_string());
       catalog.create_namespace(&namespace, HashMap::new()).await?;
   
       let table_ident = TableIdent::new(namespace.clone(), 
table_name.to_string());
   
       // Convert schema to Iceberg schema
       let iceberg_schema = iceberg::spec::Schema::try_from(&schema)?;
   
       let table = catalog.create_table(&namespace, TableCreation::builder()
           .name(table_name.to_string())
           .schema(iceberg_schema.clone())
           .properties(HashMap::new())
           .build()).await?;
   
       // Create location generator
       let location_generator = 
DefaultLocationGenerator::new(table.metadata().clone())?;
   
       // Create file name generator
       let writer_id = format!("file-{}", Uuid::new_v4());
       let file_name_generator = DefaultFileNameGenerator::new(
           writer_id,
           None,
           iceberg::spec::DataFileFormat::Parquet
       );
   
       // Create Parquet writer builder
       let parquet_props = 
parquet::file::properties::WriterProperties::builder().build();
       let parquet_writer_builder = ParquetWriterBuilder::new(
           parquet_props,
           Arc::new(iceberg_schema),
           file_io.clone(),
           location_generator,
           file_name_generator,
       );
   
       // Create data file writer
       let data_file_writer_builder = 
DataFileWriterBuilder::new(parquet_writer_builder, None);
       let mut writer = data_file_writer_builder.build().await?;
   
       // Write the batch
       writer.write(batch).await?;
   
       // Close the writer and get data files
       let data_files = writer.close().await?;
   
       // Create a transaction to append data files
       let tx = Transaction::new(&table);
       let mut fast_append = tx.fast_append(None, vec![])?;
       fast_append.add_data_files(data_files)?;
       fast_append.apply().await?;
   
       println!("Successfully wrote table!");
   
       Ok(())
   }
   ```
   
   produces the output 
   ```
   batch with schema Field { name: "map", data_type: Map(Field { name: 
"struct", data_type: Struct([Field { name: "key", data_type: Int64, nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} 
}, Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }]), nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} 
}, false), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "4"} }
   Error: Unexpected => Failed to write using parquet writer.
   
   Source: Arrow: Incompatible type. Field 'map' has type Map(Field { name: 
"key_value", data_type: Struct([Field { name: "key", data_type: Int64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "1"} }, Field { name: "value", data_type: Float64, 
nullable: true, dict_id: 0, dict_is_ordered: false, metadata: 
{"PARQUET:field_id": "2"} }]), nullable: false, dict_id: 0, dict_is_ordered: 
false, metadata: {} }, false), array has type Map(Field { name: "struct", 
data_type: Struct([Field { name: "key", data_type: Int64, nullable: false, 
dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "1"} }, 
Field { name: "value", data_type: Float64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {"PARQUET:field_id": "2"} }]), nullable: 
false, dict_id: 0, dict_is_ordered: false, metadata: {"PARQUET:field_id": "3"} 
}, false)
   ```
   Notice metadata value "4" is missing from the Map datatype in the error 
message, but is definitely present in the record batch.
   
   
   
   ### Expected behavior
   
   _No response_
   
   ### Willingness to contribute
   
   None


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to