zeodtr opened a new issue, #338:
URL: https://github.com/apache/iceberg-rust/issues/338

   Hi,
   I've been developing a query engine that uses `iceberg-rust` crate.
   Upon checking Iceberg compatibility with 
org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.4.3, I didn't encounter any 
issues, at least not with my engine. However, when testing with 
org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.5.0, I did come across a 
few issues. I managed to address them, either through fixes or workarounds. 
Here's a summary of the issues encountered and the solutions applied:
   
   **Issue 1.**
   
   In the following scenario,
   
   1. CREATE TABLE with my engine - success
   2. INSERT INTO with my engine - success
   3. DELETE FROM with Spark - fails with the following message:
   ```
   java.lang.IllegalArgumentException: Not a list type: map<int, long>
           at org.apache.iceberg.types.Type.asListType(Type.java:75)
           at 
org.apache.iceberg.avro.AvroWithPartnerVisitor$FieldIDAccessors.listElementPartner(AvroWithPartnerVisitor.java:65)
           at 
org.apache.iceberg.avro.AvroWithPartnerVisitor$FieldIDAccessors.listElementPartner(AvroWithPartnerVisitor.java:40)
           at 
org.apache.iceberg.avro.AvroWithPartnerVisitor.visitArray(AvroWithPartnerVisitor.java:205)
   ```
   
   The reason behind this is that iceberg-rust doesn't include `"logicalType": 
"map"` in the Avro schema for Iceberg maps with non-string keys, which are 
represented as Avro arrays.
   
   To address this, I've applied the not-yet-official `apache_avro` 0.17 from 
GitHub and adjusted the iceberg-rust code to align with the changed Avro Rust 
API. (BTW, the API change was done by an iceberg-rust developer maybe to fix 
this kind of issue). Then add the logical type to the schema.
   
   **Issue 2.**
   
   In the following scenario,
   
   1. CREATE TABLE with my engine - success
   2. INSERT INTO with my engine - success
   3. DELETE FROM with Spark - fails with the following message:
   
   ```
   
org.apache.iceberg.shaded.org.apache.avro.file.DataFileWriter$AppendWriteException:
 java.lang.NullPointerException
           at 
org.apache.iceberg.shaded.org.apache.avro.file.DataFileWriter.append(DataFileWriter.java:317)
           at 
org.apache.iceberg.avro.AvroFileAppender.add(AvroFileAppender.java:66)
           at 
org.apache.iceberg.ManifestListWriter.add(ManifestListWriter.java:45)
           at java.util.Arrays$ArrayList.forEach(Arrays.java:3880)
   ```
   
   Once I applied version apache_avro 0.17 and started writing `field_id` to 
the Avro schema, this issue was resolved.
   
   **Issue 3.**
   
   In the following scenario,
   
   1. CREATE TABLE with my engine - success
   2. INSERT INTO with Spark - success
   3. INSERT INTO with my engine - fails with the following message:
   
   ```
         DataInvalid => Failure in conversion with avro, source: Missing field 
in record: \"added_data_files_count\"
   ```
   
   This error is related to the Iceberg issue 
https://github.com/apache/iceberg/issues/8684 and `iceberg-rust`'s inability to 
read an Avro file schema by `field_id`, instead relying on field names.
   
   In the aforementioned Iceberg issue, an Iceberg Java developer discovered 
inconsistencies between the Java source code and specifications regarding the 
field names of the `manifest_file` struct. Subsequently, the source code was 
modified to align with the specifications. As a result, Iceberg Java's Avro 
writers started using different (correct) field names. This adjustment didn't 
affect Iceberg Java, as it reads the Avro schema using the field_id rather than 
the field name. However, iceberg-rust reads the Avro schema using the field 
name, causing the current issue.
   
   To address this, I examined the iceberg-rust and Avro rust codes. However, 
implementing the functionality to read by field_id seemed to require a fair 
amount of time (at least for me). As a temporary solution, I applied an ad hoc 
workaround in `manifest_list.rs`, after replacing all the incorrect field names 
in the code.
   
   ```rust
   const WRONG_FIELD_NAME_MAPS: &[(&str, &str)] = &[
       ("added_data_files_count", "added_files_count"),
       ("existing_data_files_count", "existing_files_count"),
       ("deleted_data_files_count", "deleted_files_count"),
   ];
   
   impl ManifestList {
       /// Parse manifest list from bytes.
       pub fn parse_with_version(
           bs: &[u8],
           version: FormatVersion,
           partition_type: &StructType,
       ) -> Result<ManifestList, Error> {
           // We cannot use avro's schema resolution, so use Reader::new() 
instead of Reader::with_schema(),
           // and let from_value() check the 'schema' correctness.
           let reader = Reader::new(bs)?;
           let wrong_field_name_found = match reader.writer_schema() {
               Schema::Record(record_schema) => record_schema
                   .lookup
                   .contains_key(WRONG_FIELD_NAME_MAPS[0].0),
               _ => false,
           };
           let values = reader.collect::<Result<Vec<Value>, _>>()?;
           let values = if wrong_field_name_found {
               values
                   .into_iter()
                   .map(|value| match value {
                       Value::Record(fields) => {
                           let new_fields = fields
                               .into_iter()
                               .map(|field| {
                                   for map in WRONG_FIELD_NAME_MAPS {
                                       if map.0 == field.0 {
                                           return (map.1.to_string(), field.1);
                                       }
                                   }
                                   field
                               })
                               .collect::<Vec<_>>();
                           Value::Record(new_fields)
                       }
                       _ => value,
                   })
                   .collect::<Vec<_>>()
           } else {
               values
           };
           let values = Value::Array(values);
           match version {
               FormatVersion::V1 => {
                   
from_value::<_serde::ManifestListV1>(&values)?.try_into(partition_type)
               }
               FormatVersion::V2 => {
                   
from_value::<_serde::ManifestListV2>(&values)?.try_into(partition_type)
               }
           }
       }
   
       /// Get the entries in the manifest list.
       pub fn entries(&self) -> &[ManifestListEntry] {
           &self.entries
       }
   }
   ```
   
   It essentially replaces 'invalid' field names with the correct ones. 
However, I perceive this as more of a workaround than a solution. Nonetheless, 
it serves its purpose for the time being. It would be nice if a more 
fundamental solution could be implemented in the future, such as reading with 
the field_id.
   
   Thank you.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to