liurenjie1024 commented on code in PR #258:
URL: https://github.com/apache/iceberg-rust/pull/258#discussion_r1531450892


##########
crates/iceberg/src/arrow.rs:
##########
@@ -106,3 +114,732 @@ impl ArrowReader {
         ProjectionMask::all()
     }
 }
+
+/// A post order arrow schema visitor.
+///
+/// For order of methods called, please refer to [`visit_schema`].
+pub trait ArrowSchemaVisitor {
+    /// Return type of this visitor on arrow field.
+    type T;
+
+    /// Return type of this visitor on arrow schema.
+    type U;
+
+    /// Called before struct/list/map field.
+    fn before_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after struct/list/map field.
+    fn after_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before list element.
+    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after list element.
+    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map key.
+    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map key.
+    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map value.
+    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map value.
+    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after schema's type visited.
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U>;
+
+    /// Called after struct's fields visited.
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T>;
+
+    /// Called after list fields visited.
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
+
+    /// Called after map's key and value fields visited.
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T>;
+
+    /// Called when see a primitive type.
+    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
+}
+
+/// Visiting a type in post order.
+fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> 
Result<V::T> {
+    match r#type {
+        p if p.is_primitive()
+            || matches!(
+                p,
+                DataType::Boolean
+                    | DataType::Utf8
+                    | DataType::LargeUtf8
+                    | DataType::Binary
+                    | DataType::LargeBinary
+                    | DataType::FixedSizeBinary(_)
+            ) =>
+        {
+            visitor.primitive(p)
+        }
+        DataType::List(element_field) => visit_list(r#type, element_field, 
visitor),
+        DataType::LargeList(element_field) => visit_list(r#type, 
element_field, visitor),
+        DataType::FixedSizeList(element_field, _) => visit_list(r#type, 
element_field, visitor),
+        DataType::Map(field, _) => match field.data_type() {
+            DataType::Struct(fields) => {
+                if fields.len() != 2 {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        "Map field must have exactly 2 fields",
+                    ));
+                }
+
+                let key_field = &fields[0];
+                let value_field = &fields[1];
+
+                let key_result = {
+                    visitor.before_map_key(key_field)?;
+                    let ret = visit_type(key_field.data_type(), visitor)?;
+                    visitor.after_map_key(key_field)?;
+                    ret
+                };
+
+                let value_result = {
+                    visitor.before_map_value(value_field)?;
+                    let ret = visit_type(value_field.data_type(), visitor)?;
+                    visitor.after_map_value(value_field)?;
+                    ret
+                };
+
+                visitor.map(r#type, key_result, value_result)
+            }
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Map field must have struct type",
+            )),
+        },
+        DataType::Struct(fields) => visit_struct(fields, visitor),
+        other => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot visit Arrow data type: {other}"),
+        )),
+    }
+}
+
+/// Visit list types in post order.
+#[allow(dead_code)]
+fn visit_list<V: ArrowSchemaVisitor>(
+    data_type: &DataType,
+    element_field: &Field,
+    visitor: &mut V,
+) -> Result<V::T> {
+    visitor.before_list_element(element_field)?;
+    let value = visit_type(element_field.data_type(), visitor)?;
+    visitor.after_list_element(element_field)?;
+    visitor.list(data_type, value)
+}
+
+/// Visit struct type in post order.
+#[allow(dead_code)]
+fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> 
Result<V::T> {
+    let mut results = Vec::with_capacity(fields.len());
+    for field in fields {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+
+    visitor.r#struct(fields, results)
+}
+
+/// Visit schema in post order.
+#[allow(dead_code)]
+fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) 
-> Result<V::U> {
+    let mut results = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+    visitor.schema(schema, results)
+}
+
+/// Convert Arrow schema to ceberg schema.
+#[allow(dead_code)]
+pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
+    let mut visitor = ArrowSchemaConverter::new();
+    visit_schema(schema, &mut visitor)
+}
+
+const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id";
+const ARROW_FIELD_DOC_KEY: &str = "doc";
+
+fn get_field_id(field: &Field) -> Result<i32> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) {
+        return value.parse::<i32>().map_err(|e| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Failed to parse field id: {e}"),
+            )
+        });
+    }
+    Err(Error::new(
+        ErrorKind::DataInvalid,
+        "Field id not found in metadata",
+    ))
+}
+
+fn get_field_doc(field: &Field) -> Option<String> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
+        return Some(value.clone());
+    }
+    None
+}
+
+struct ArrowSchemaConverter {}
+
+impl ArrowSchemaConverter {
+    #[allow(dead_code)]
+    fn new() -> Self {
+        Self {}
+    }
+
+    fn convert_fields(fields: &Fields, field_results: &[Type]) -> 
Result<Vec<NestedFieldRef>> {
+        let mut results = Vec::with_capacity(fields.len());
+        for i in 0..fields.len() {
+            let field = &fields[i];
+            let field_type = &field_results[i];
+            let id = get_field_id(field)?;
+            let doc = get_field_doc(field);
+            let nested_field = NestedField {
+                id,
+                doc,
+                name: field.name().clone(),
+                required: !field.is_nullable(),
+                field_type: Box::new(field_type.clone()),
+                initial_default: None,
+                write_default: None,
+            };
+            results.push(Arc::new(nested_field));
+        }
+        Ok(results)
+    }
+}
+
+impl ArrowSchemaVisitor for ArrowSchemaConverter {
+    type T = Type;
+    type U = Schema;
+
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U> {
+        let fields = Self::convert_fields(schema.fields(), &values)?;
+        let builder = Schema::builder().with_fields(fields);
+        builder.build()
+    }
+
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T> {
+        let fields = Self::convert_fields(fields, &results)?;
+        Ok(Type::Struct(StructType::new(fields)))
+    }
+
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
+        let element_field = match list {
+            DataType::List(element_field) => element_field,
+            DataType::LargeList(element_field) => element_field,
+            DataType::FixedSizeList(element_field, _) => element_field,
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "List type must have list data type",
+                ))
+            }
+        };
+
+        let id = get_field_id(element_field)?;
+        let doc = get_field_doc(element_field);
+        let element_field = Arc::new(NestedField {
+            id,
+            doc,

Review Comment:
   Using `NestedField::list_element` method?



##########
crates/iceberg/src/arrow.rs:
##########
@@ -106,3 +114,732 @@ impl ArrowReader {
         ProjectionMask::all()
     }
 }
+
+/// A post order arrow schema visitor.
+///
+/// For order of methods called, please refer to [`visit_schema`].
+pub trait ArrowSchemaVisitor {
+    /// Return type of this visitor on arrow field.
+    type T;
+
+    /// Return type of this visitor on arrow schema.
+    type U;
+
+    /// Called before struct/list/map field.
+    fn before_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after struct/list/map field.
+    fn after_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before list element.
+    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after list element.
+    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map key.
+    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map key.
+    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map value.
+    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map value.
+    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after schema's type visited.
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U>;
+
+    /// Called after struct's fields visited.
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T>;
+
+    /// Called after list fields visited.
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
+
+    /// Called after map's key and value fields visited.
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T>;
+
+    /// Called when see a primitive type.
+    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
+}
+
+/// Visiting a type in post order.
+fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> 
Result<V::T> {
+    match r#type {
+        p if p.is_primitive()
+            || matches!(
+                p,
+                DataType::Boolean
+                    | DataType::Utf8
+                    | DataType::LargeUtf8
+                    | DataType::Binary
+                    | DataType::LargeBinary
+                    | DataType::FixedSizeBinary(_)
+            ) =>
+        {
+            visitor.primitive(p)
+        }
+        DataType::List(element_field) => visit_list(r#type, element_field, 
visitor),
+        DataType::LargeList(element_field) => visit_list(r#type, 
element_field, visitor),
+        DataType::FixedSizeList(element_field, _) => visit_list(r#type, 
element_field, visitor),
+        DataType::Map(field, _) => match field.data_type() {
+            DataType::Struct(fields) => {
+                if fields.len() != 2 {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        "Map field must have exactly 2 fields",
+                    ));
+                }
+
+                let key_field = &fields[0];
+                let value_field = &fields[1];
+
+                let key_result = {
+                    visitor.before_map_key(key_field)?;
+                    let ret = visit_type(key_field.data_type(), visitor)?;
+                    visitor.after_map_key(key_field)?;
+                    ret
+                };
+
+                let value_result = {
+                    visitor.before_map_value(value_field)?;
+                    let ret = visit_type(value_field.data_type(), visitor)?;
+                    visitor.after_map_value(value_field)?;
+                    ret
+                };
+
+                visitor.map(r#type, key_result, value_result)
+            }
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Map field must have struct type",
+            )),
+        },
+        DataType::Struct(fields) => visit_struct(fields, visitor),
+        other => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot visit Arrow data type: {other}"),
+        )),
+    }
+}
+
+/// Visit list types in post order.
+#[allow(dead_code)]
+fn visit_list<V: ArrowSchemaVisitor>(
+    data_type: &DataType,
+    element_field: &Field,
+    visitor: &mut V,
+) -> Result<V::T> {
+    visitor.before_list_element(element_field)?;
+    let value = visit_type(element_field.data_type(), visitor)?;
+    visitor.after_list_element(element_field)?;
+    visitor.list(data_type, value)
+}
+
+/// Visit struct type in post order.
+#[allow(dead_code)]
+fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> 
Result<V::T> {
+    let mut results = Vec::with_capacity(fields.len());
+    for field in fields {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+
+    visitor.r#struct(fields, results)
+}
+
+/// Visit schema in post order.
+#[allow(dead_code)]
+fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) 
-> Result<V::U> {
+    let mut results = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+    visitor.schema(schema, results)
+}
+
+/// Convert Arrow schema to ceberg schema.
+#[allow(dead_code)]
+pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
+    let mut visitor = ArrowSchemaConverter::new();
+    visit_schema(schema, &mut visitor)
+}
+
+const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id";
+const ARROW_FIELD_DOC_KEY: &str = "doc";
+
+fn get_field_id(field: &Field) -> Result<i32> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) {
+        return value.parse::<i32>().map_err(|e| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Failed to parse field id: {e}"),
+            )

Review Comment:
   ```suggestion
               Error::new(
                   ErrorKind::DataInvalid,
                   format!("Failed to parse field id"),
               ).with_context("value", value)
               .with_source(e)
   ```
   
   This will make error reporting easier to read.



##########
crates/iceberg/src/arrow.rs:
##########
@@ -106,3 +114,732 @@ impl ArrowReader {
         ProjectionMask::all()
     }
 }
+
+/// A post order arrow schema visitor.
+///
+/// For order of methods called, please refer to [`visit_schema`].
+pub trait ArrowSchemaVisitor {
+    /// Return type of this visitor on arrow field.
+    type T;
+
+    /// Return type of this visitor on arrow schema.
+    type U;
+
+    /// Called before struct/list/map field.
+    fn before_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after struct/list/map field.
+    fn after_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before list element.
+    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after list element.
+    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map key.
+    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map key.
+    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map value.
+    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map value.
+    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after schema's type visited.
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U>;
+
+    /// Called after struct's fields visited.
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T>;
+
+    /// Called after list fields visited.
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
+
+    /// Called after map's key and value fields visited.
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T>;
+
+    /// Called when see a primitive type.
+    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
+}
+
+/// Visiting a type in post order.
+fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> 
Result<V::T> {
+    match r#type {
+        p if p.is_primitive()
+            || matches!(
+                p,
+                DataType::Boolean
+                    | DataType::Utf8
+                    | DataType::LargeUtf8
+                    | DataType::Binary
+                    | DataType::LargeBinary
+                    | DataType::FixedSizeBinary(_)
+            ) =>
+        {
+            visitor.primitive(p)
+        }
+        DataType::List(element_field) => visit_list(r#type, element_field, 
visitor),
+        DataType::LargeList(element_field) => visit_list(r#type, 
element_field, visitor),
+        DataType::FixedSizeList(element_field, _) => visit_list(r#type, 
element_field, visitor),
+        DataType::Map(field, _) => match field.data_type() {
+            DataType::Struct(fields) => {
+                if fields.len() != 2 {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        "Map field must have exactly 2 fields",
+                    ));
+                }
+
+                let key_field = &fields[0];
+                let value_field = &fields[1];
+
+                let key_result = {
+                    visitor.before_map_key(key_field)?;
+                    let ret = visit_type(key_field.data_type(), visitor)?;
+                    visitor.after_map_key(key_field)?;
+                    ret
+                };
+
+                let value_result = {
+                    visitor.before_map_value(value_field)?;
+                    let ret = visit_type(value_field.data_type(), visitor)?;
+                    visitor.after_map_value(value_field)?;
+                    ret
+                };
+
+                visitor.map(r#type, key_result, value_result)
+            }
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Map field must have struct type",
+            )),
+        },
+        DataType::Struct(fields) => visit_struct(fields, visitor),
+        other => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot visit Arrow data type: {other}"),
+        )),
+    }
+}
+
+/// Visit list types in post order.
+#[allow(dead_code)]
+fn visit_list<V: ArrowSchemaVisitor>(
+    data_type: &DataType,
+    element_field: &Field,
+    visitor: &mut V,
+) -> Result<V::T> {
+    visitor.before_list_element(element_field)?;
+    let value = visit_type(element_field.data_type(), visitor)?;
+    visitor.after_list_element(element_field)?;
+    visitor.list(data_type, value)
+}
+
+/// Visit struct type in post order.
+#[allow(dead_code)]
+fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> 
Result<V::T> {
+    let mut results = Vec::with_capacity(fields.len());
+    for field in fields {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+
+    visitor.r#struct(fields, results)
+}
+
+/// Visit schema in post order.
+#[allow(dead_code)]
+fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) 
-> Result<V::U> {
+    let mut results = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+    visitor.schema(schema, results)
+}
+
+/// Convert Arrow schema to ceberg schema.
+#[allow(dead_code)]
+pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
+    let mut visitor = ArrowSchemaConverter::new();
+    visit_schema(schema, &mut visitor)
+}
+
+const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id";
+const ARROW_FIELD_DOC_KEY: &str = "doc";
+
+fn get_field_id(field: &Field) -> Result<i32> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) {
+        return value.parse::<i32>().map_err(|e| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Failed to parse field id: {e}"),
+            )
+        });
+    }
+    Err(Error::new(
+        ErrorKind::DataInvalid,
+        "Field id not found in metadata",
+    ))
+}
+
+fn get_field_doc(field: &Field) -> Option<String> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
+        return Some(value.clone());
+    }
+    None
+}
+
+struct ArrowSchemaConverter {}

Review Comment:
   ```suggestion
   struct ArrowSchemaConverter;
   ```



##########
crates/iceberg/src/arrow.rs:
##########
@@ -106,3 +114,732 @@ impl ArrowReader {
         ProjectionMask::all()
     }
 }
+
+/// A post order arrow schema visitor.
+///
+/// For order of methods called, please refer to [`visit_schema`].
+pub trait ArrowSchemaVisitor {
+    /// Return type of this visitor on arrow field.
+    type T;
+
+    /// Return type of this visitor on arrow schema.
+    type U;
+
+    /// Called before struct/list/map field.
+    fn before_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after struct/list/map field.
+    fn after_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before list element.
+    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after list element.
+    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map key.
+    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map key.
+    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map value.
+    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map value.
+    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after schema's type visited.
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U>;
+
+    /// Called after struct's fields visited.
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T>;
+
+    /// Called after list fields visited.
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
+
+    /// Called after map's key and value fields visited.
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T>;
+
+    /// Called when see a primitive type.
+    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
+}
+
+/// Visiting a type in post order.
+fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> 
Result<V::T> {
+    match r#type {
+        p if p.is_primitive()
+            || matches!(
+                p,
+                DataType::Boolean
+                    | DataType::Utf8
+                    | DataType::LargeUtf8
+                    | DataType::Binary
+                    | DataType::LargeBinary
+                    | DataType::FixedSizeBinary(_)
+            ) =>
+        {
+            visitor.primitive(p)
+        }
+        DataType::List(element_field) => visit_list(r#type, element_field, 
visitor),
+        DataType::LargeList(element_field) => visit_list(r#type, 
element_field, visitor),
+        DataType::FixedSizeList(element_field, _) => visit_list(r#type, 
element_field, visitor),
+        DataType::Map(field, _) => match field.data_type() {
+            DataType::Struct(fields) => {
+                if fields.len() != 2 {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        "Map field must have exactly 2 fields",
+                    ));
+                }
+
+                let key_field = &fields[0];
+                let value_field = &fields[1];
+
+                let key_result = {
+                    visitor.before_map_key(key_field)?;
+                    let ret = visit_type(key_field.data_type(), visitor)?;
+                    visitor.after_map_key(key_field)?;
+                    ret
+                };
+
+                let value_result = {
+                    visitor.before_map_value(value_field)?;
+                    let ret = visit_type(value_field.data_type(), visitor)?;
+                    visitor.after_map_value(value_field)?;
+                    ret
+                };
+
+                visitor.map(r#type, key_result, value_result)
+            }
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Map field must have struct type",
+            )),
+        },
+        DataType::Struct(fields) => visit_struct(fields, visitor),
+        other => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot visit Arrow data type: {other}"),
+        )),
+    }
+}
+
+/// Visit list types in post order.
+#[allow(dead_code)]
+fn visit_list<V: ArrowSchemaVisitor>(
+    data_type: &DataType,
+    element_field: &Field,
+    visitor: &mut V,
+) -> Result<V::T> {
+    visitor.before_list_element(element_field)?;
+    let value = visit_type(element_field.data_type(), visitor)?;
+    visitor.after_list_element(element_field)?;
+    visitor.list(data_type, value)
+}
+
+/// Visit struct type in post order.
+#[allow(dead_code)]
+fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> 
Result<V::T> {
+    let mut results = Vec::with_capacity(fields.len());
+    for field in fields {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+
+    visitor.r#struct(fields, results)
+}
+
+/// Visit schema in post order.
+#[allow(dead_code)]
+fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) 
-> Result<V::U> {
+    let mut results = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+    visitor.schema(schema, results)
+}
+
+/// Convert Arrow schema to ceberg schema.
+#[allow(dead_code)]
+pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
+    let mut visitor = ArrowSchemaConverter::new();
+    visit_schema(schema, &mut visitor)
+}
+
+const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id";
+const ARROW_FIELD_DOC_KEY: &str = "doc";
+
+fn get_field_id(field: &Field) -> Result<i32> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) {
+        return value.parse::<i32>().map_err(|e| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Failed to parse field id: {e}"),
+            )
+        });
+    }
+    Err(Error::new(
+        ErrorKind::DataInvalid,
+        "Field id not found in metadata",
+    ))
+}
+
+fn get_field_doc(field: &Field) -> Option<String> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
+        return Some(value.clone());
+    }
+    None
+}
+
+struct ArrowSchemaConverter {}
+
+impl ArrowSchemaConverter {
+    #[allow(dead_code)]
+    fn new() -> Self {
+        Self {}
+    }
+
+    fn convert_fields(fields: &Fields, field_results: &[Type]) -> 
Result<Vec<NestedFieldRef>> {
+        let mut results = Vec::with_capacity(fields.len());
+        for i in 0..fields.len() {
+            let field = &fields[i];
+            let field_type = &field_results[i];
+            let id = get_field_id(field)?;
+            let doc = get_field_doc(field);
+            let nested_field = NestedField {
+                id,
+                doc,
+                name: field.name().clone(),
+                required: !field.is_nullable(),
+                field_type: Box::new(field_type.clone()),
+                initial_default: None,
+                write_default: None,
+            };
+            results.push(Arc::new(nested_field));
+        }
+        Ok(results)
+    }
+}
+
+impl ArrowSchemaVisitor for ArrowSchemaConverter {
+    type T = Type;
+    type U = Schema;
+
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U> {
+        let fields = Self::convert_fields(schema.fields(), &values)?;
+        let builder = Schema::builder().with_fields(fields);
+        builder.build()
+    }
+
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T> {
+        let fields = Self::convert_fields(fields, &results)?;
+        Ok(Type::Struct(StructType::new(fields)))
+    }
+
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
+        let element_field = match list {
+            DataType::List(element_field) => element_field,
+            DataType::LargeList(element_field) => element_field,
+            DataType::FixedSizeList(element_field, _) => element_field,
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "List type must have list data type",
+                ))
+            }
+        };
+
+        let id = get_field_id(element_field)?;
+        let doc = get_field_doc(element_field);
+        let element_field = Arc::new(NestedField {
+            id,
+            doc,
+            name: "element".to_string(),
+            required: !element_field.is_nullable(),
+            field_type: Box::new(value.clone()),
+            initial_default: None,
+            write_default: None,
+        });
+        Ok(Type::List(ListType { element_field }))
+    }
+
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T> {
+        match map {
+            DataType::Map(field, _) => match field.data_type() {
+                DataType::Struct(fields) => {
+                    if fields.len() != 2 {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Map field must have exactly 2 fields",
+                        ));
+                    }
+
+                    let key_field = &fields[0];
+                    let value_field = &fields[1];
+
+                    let key_id = get_field_id(key_field)?;
+                    let key_doc = get_field_doc(key_field);
+                    let key_field = Arc::new(NestedField {
+                        id: key_id,
+                        doc: key_doc,
+                        name: "key".to_string(),
+                        required: !key_field.is_nullable(),
+                        field_type: Box::new(key_value.clone()),
+                        initial_default: None,
+                        write_default: None,
+                    });
+
+                    let value_id = get_field_id(value_field)?;

Review Comment:
   Use `Using NestedField::map_value_element` method?



##########
crates/iceberg/src/arrow.rs:
##########
@@ -106,3 +114,732 @@ impl ArrowReader {
         ProjectionMask::all()
     }
 }
+
+/// A post order arrow schema visitor.
+///
+/// For order of methods called, please refer to [`visit_schema`].
+pub trait ArrowSchemaVisitor {
+    /// Return type of this visitor on arrow field.
+    type T;
+
+    /// Return type of this visitor on arrow schema.
+    type U;
+
+    /// Called before struct/list/map field.
+    fn before_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after struct/list/map field.
+    fn after_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before list element.
+    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after list element.
+    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map key.
+    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map key.
+    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map value.
+    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map value.
+    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after schema's type visited.
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U>;
+
+    /// Called after struct's fields visited.
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T>;
+
+    /// Called after list fields visited.
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
+
+    /// Called after map's key and value fields visited.
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T>;
+
+    /// Called when see a primitive type.
+    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
+}
+
+/// Visiting a type in post order.
+fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> 
Result<V::T> {
+    match r#type {
+        p if p.is_primitive()
+            || matches!(
+                p,
+                DataType::Boolean
+                    | DataType::Utf8
+                    | DataType::LargeUtf8
+                    | DataType::Binary
+                    | DataType::LargeBinary
+                    | DataType::FixedSizeBinary(_)
+            ) =>
+        {
+            visitor.primitive(p)
+        }
+        DataType::List(element_field) => visit_list(r#type, element_field, 
visitor),
+        DataType::LargeList(element_field) => visit_list(r#type, 
element_field, visitor),
+        DataType::FixedSizeList(element_field, _) => visit_list(r#type, 
element_field, visitor),
+        DataType::Map(field, _) => match field.data_type() {
+            DataType::Struct(fields) => {
+                if fields.len() != 2 {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        "Map field must have exactly 2 fields",
+                    ));
+                }
+
+                let key_field = &fields[0];
+                let value_field = &fields[1];
+
+                let key_result = {
+                    visitor.before_map_key(key_field)?;
+                    let ret = visit_type(key_field.data_type(), visitor)?;
+                    visitor.after_map_key(key_field)?;
+                    ret
+                };
+
+                let value_result = {
+                    visitor.before_map_value(value_field)?;
+                    let ret = visit_type(value_field.data_type(), visitor)?;
+                    visitor.after_map_value(value_field)?;
+                    ret
+                };
+
+                visitor.map(r#type, key_result, value_result)
+            }
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Map field must have struct type",
+            )),
+        },
+        DataType::Struct(fields) => visit_struct(fields, visitor),
+        other => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot visit Arrow data type: {other}"),
+        )),
+    }
+}
+
+/// Visit list types in post order.
+#[allow(dead_code)]
+fn visit_list<V: ArrowSchemaVisitor>(
+    data_type: &DataType,
+    element_field: &Field,
+    visitor: &mut V,
+) -> Result<V::T> {
+    visitor.before_list_element(element_field)?;
+    let value = visit_type(element_field.data_type(), visitor)?;
+    visitor.after_list_element(element_field)?;
+    visitor.list(data_type, value)
+}
+
+/// Visit struct type in post order.
+#[allow(dead_code)]
+fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> 
Result<V::T> {
+    let mut results = Vec::with_capacity(fields.len());
+    for field in fields {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+
+    visitor.r#struct(fields, results)
+}
+
+/// Visit schema in post order.
+#[allow(dead_code)]
+fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) 
-> Result<V::U> {
+    let mut results = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+    visitor.schema(schema, results)
+}
+
+/// Convert Arrow schema to ceberg schema.
+#[allow(dead_code)]
+pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
+    let mut visitor = ArrowSchemaConverter::new();
+    visit_schema(schema, &mut visitor)
+}
+
+const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id";
+const ARROW_FIELD_DOC_KEY: &str = "doc";
+
+fn get_field_id(field: &Field) -> Result<i32> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) {
+        return value.parse::<i32>().map_err(|e| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Failed to parse field id: {e}"),
+            )
+        });
+    }
+    Err(Error::new(
+        ErrorKind::DataInvalid,
+        "Field id not found in metadata",
+    ))
+}
+
+fn get_field_doc(field: &Field) -> Option<String> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
+        return Some(value.clone());
+    }
+    None
+}
+
+struct ArrowSchemaConverter {}
+
+impl ArrowSchemaConverter {
+    #[allow(dead_code)]
+    fn new() -> Self {
+        Self {}
+    }
+
+    fn convert_fields(fields: &Fields, field_results: &[Type]) -> 
Result<Vec<NestedFieldRef>> {
+        let mut results = Vec::with_capacity(fields.len());
+        for i in 0..fields.len() {
+            let field = &fields[i];
+            let field_type = &field_results[i];
+            let id = get_field_id(field)?;
+            let doc = get_field_doc(field);
+            let nested_field = NestedField {
+                id,
+                doc,
+                name: field.name().clone(),
+                required: !field.is_nullable(),
+                field_type: Box::new(field_type.clone()),
+                initial_default: None,
+                write_default: None,
+            };
+            results.push(Arc::new(nested_field));
+        }
+        Ok(results)
+    }
+}
+
+impl ArrowSchemaVisitor for ArrowSchemaConverter {
+    type T = Type;
+    type U = Schema;
+
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U> {
+        let fields = Self::convert_fields(schema.fields(), &values)?;
+        let builder = Schema::builder().with_fields(fields);
+        builder.build()
+    }
+
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T> {
+        let fields = Self::convert_fields(fields, &results)?;
+        Ok(Type::Struct(StructType::new(fields)))
+    }
+
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
+        let element_field = match list {
+            DataType::List(element_field) => element_field,
+            DataType::LargeList(element_field) => element_field,
+            DataType::FixedSizeList(element_field, _) => element_field,
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "List type must have list data type",
+                ))
+            }
+        };
+
+        let id = get_field_id(element_field)?;
+        let doc = get_field_doc(element_field);
+        let element_field = Arc::new(NestedField {
+            id,
+            doc,
+            name: "element".to_string(),
+            required: !element_field.is_nullable(),
+            field_type: Box::new(value.clone()),
+            initial_default: None,
+            write_default: None,
+        });
+        Ok(Type::List(ListType { element_field }))
+    }
+
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T> {
+        match map {
+            DataType::Map(field, _) => match field.data_type() {
+                DataType::Struct(fields) => {
+                    if fields.len() != 2 {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Map field must have exactly 2 fields",
+                        ));
+                    }
+
+                    let key_field = &fields[0];
+                    let value_field = &fields[1];
+
+                    let key_id = get_field_id(key_field)?;
+                    let key_doc = get_field_doc(key_field);
+                    let key_field = Arc::new(NestedField {
+                        id: key_id,
+                        doc: key_doc,
+                        name: "key".to_string(),
+                        required: !key_field.is_nullable(),
+                        field_type: Box::new(key_value.clone()),
+                        initial_default: None,
+                        write_default: None,
+                    });
+
+                    let value_id = get_field_id(value_field)?;
+                    let value_doc = get_field_doc(value_field);
+                    let value_field = Arc::new(NestedField {
+                        id: value_id,
+                        doc: value_doc,
+                        name: "value".to_string(),
+                        required: !value_field.is_nullable(),
+                        field_type: Box::new(value.clone()),
+                        initial_default: None,
+                        write_default: None,
+                    });
+
+                    Ok(Type::Map(MapType {
+                        key_field,
+                        value_field,
+                    }))
+                }
+                _ => Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "Map field must have struct type",
+                )),
+            },
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Map type must have map data type",
+            )),
+        }
+    }
+
+    fn primitive(&mut self, p: &DataType) -> Result<Self::T> {
+        match p {
+            DataType::Boolean => Ok(Type::Primitive(PrimitiveType::Boolean)),
+            DataType::Int32 => Ok(Type::Primitive(PrimitiveType::Int)),
+            DataType::Int64 => Ok(Type::Primitive(PrimitiveType::Long)),
+            DataType::Float32 => Ok(Type::Primitive(PrimitiveType::Float)),
+            DataType::Float64 => Ok(Type::Primitive(PrimitiveType::Double)),
+            DataType::Decimal128(p, s) => 
Ok(Type::Primitive(PrimitiveType::Decimal {

Review Comment:
   Use `Type::decimal` instead? iceberg has some limitations for decimal's 
precision and scale. 



##########
crates/iceberg/src/arrow.rs:
##########
@@ -106,3 +114,732 @@ impl ArrowReader {
         ProjectionMask::all()
     }
 }
+
+/// A post order arrow schema visitor.
+///
+/// For order of methods called, please refer to [`visit_schema`].
+pub trait ArrowSchemaVisitor {
+    /// Return type of this visitor on arrow field.
+    type T;
+
+    /// Return type of this visitor on arrow schema.
+    type U;
+
+    /// Called before struct/list/map field.
+    fn before_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after struct/list/map field.
+    fn after_field(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before list element.
+    fn before_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after list element.
+    fn after_list_element(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map key.
+    fn before_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map key.
+    fn after_map_key(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called before map value.
+    fn before_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after map value.
+    fn after_map_value(&mut self, _field: &Field) -> Result<()> {
+        Ok(())
+    }
+
+    /// Called after schema's type visited.
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U>;
+
+    /// Called after struct's fields visited.
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T>;
+
+    /// Called after list fields visited.
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T>;
+
+    /// Called after map's key and value fields visited.
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T>;
+
+    /// Called when see a primitive type.
+    fn primitive(&mut self, p: &DataType) -> Result<Self::T>;
+}
+
+/// Visiting a type in post order.
+fn visit_type<V: ArrowSchemaVisitor>(r#type: &DataType, visitor: &mut V) -> 
Result<V::T> {
+    match r#type {
+        p if p.is_primitive()
+            || matches!(
+                p,
+                DataType::Boolean
+                    | DataType::Utf8
+                    | DataType::LargeUtf8
+                    | DataType::Binary
+                    | DataType::LargeBinary
+                    | DataType::FixedSizeBinary(_)
+            ) =>
+        {
+            visitor.primitive(p)
+        }
+        DataType::List(element_field) => visit_list(r#type, element_field, 
visitor),
+        DataType::LargeList(element_field) => visit_list(r#type, 
element_field, visitor),
+        DataType::FixedSizeList(element_field, _) => visit_list(r#type, 
element_field, visitor),
+        DataType::Map(field, _) => match field.data_type() {
+            DataType::Struct(fields) => {
+                if fields.len() != 2 {
+                    return Err(Error::new(
+                        ErrorKind::DataInvalid,
+                        "Map field must have exactly 2 fields",
+                    ));
+                }
+
+                let key_field = &fields[0];
+                let value_field = &fields[1];
+
+                let key_result = {
+                    visitor.before_map_key(key_field)?;
+                    let ret = visit_type(key_field.data_type(), visitor)?;
+                    visitor.after_map_key(key_field)?;
+                    ret
+                };
+
+                let value_result = {
+                    visitor.before_map_value(value_field)?;
+                    let ret = visit_type(value_field.data_type(), visitor)?;
+                    visitor.after_map_value(value_field)?;
+                    ret
+                };
+
+                visitor.map(r#type, key_result, value_result)
+            }
+            _ => Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Map field must have struct type",
+            )),
+        },
+        DataType::Struct(fields) => visit_struct(fields, visitor),
+        other => Err(Error::new(
+            ErrorKind::DataInvalid,
+            format!("Cannot visit Arrow data type: {other}"),
+        )),
+    }
+}
+
+/// Visit list types in post order.
+#[allow(dead_code)]
+fn visit_list<V: ArrowSchemaVisitor>(
+    data_type: &DataType,
+    element_field: &Field,
+    visitor: &mut V,
+) -> Result<V::T> {
+    visitor.before_list_element(element_field)?;
+    let value = visit_type(element_field.data_type(), visitor)?;
+    visitor.after_list_element(element_field)?;
+    visitor.list(data_type, value)
+}
+
+/// Visit struct type in post order.
+#[allow(dead_code)]
+fn visit_struct<V: ArrowSchemaVisitor>(fields: &Fields, visitor: &mut V) -> 
Result<V::T> {
+    let mut results = Vec::with_capacity(fields.len());
+    for field in fields {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+
+    visitor.r#struct(fields, results)
+}
+
+/// Visit schema in post order.
+#[allow(dead_code)]
+fn visit_schema<V: ArrowSchemaVisitor>(schema: &ArrowSchema, visitor: &mut V) 
-> Result<V::U> {
+    let mut results = Vec::with_capacity(schema.fields().len());
+    for field in schema.fields() {
+        visitor.before_field(field)?;
+        let result = visit_type(field.data_type(), visitor)?;
+        visitor.after_field(field)?;
+        results.push(result);
+    }
+    visitor.schema(schema, results)
+}
+
+/// Convert Arrow schema to ceberg schema.
+#[allow(dead_code)]
+pub fn arrow_schema_to_schema(schema: &ArrowSchema) -> Result<Schema> {
+    let mut visitor = ArrowSchemaConverter::new();
+    visit_schema(schema, &mut visitor)
+}
+
+const ARROW_FIELD_ID_KEY: &str = "PARQUET:field_id";
+const ARROW_FIELD_DOC_KEY: &str = "doc";
+
+fn get_field_id(field: &Field) -> Result<i32> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_ID_KEY) {
+        return value.parse::<i32>().map_err(|e| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!("Failed to parse field id: {e}"),
+            )
+        });
+    }
+    Err(Error::new(
+        ErrorKind::DataInvalid,
+        "Field id not found in metadata",
+    ))
+}
+
+fn get_field_doc(field: &Field) -> Option<String> {
+    if let Some(value) = field.metadata().get(ARROW_FIELD_DOC_KEY) {
+        return Some(value.clone());
+    }
+    None
+}
+
+struct ArrowSchemaConverter {}
+
+impl ArrowSchemaConverter {
+    #[allow(dead_code)]
+    fn new() -> Self {
+        Self {}
+    }
+
+    fn convert_fields(fields: &Fields, field_results: &[Type]) -> 
Result<Vec<NestedFieldRef>> {
+        let mut results = Vec::with_capacity(fields.len());
+        for i in 0..fields.len() {
+            let field = &fields[i];
+            let field_type = &field_results[i];
+            let id = get_field_id(field)?;
+            let doc = get_field_doc(field);
+            let nested_field = NestedField {
+                id,
+                doc,
+                name: field.name().clone(),
+                required: !field.is_nullable(),
+                field_type: Box::new(field_type.clone()),
+                initial_default: None,
+                write_default: None,
+            };
+            results.push(Arc::new(nested_field));
+        }
+        Ok(results)
+    }
+}
+
+impl ArrowSchemaVisitor for ArrowSchemaConverter {
+    type T = Type;
+    type U = Schema;
+
+    fn schema(&mut self, schema: &ArrowSchema, values: Vec<Self::T>) -> 
Result<Self::U> {
+        let fields = Self::convert_fields(schema.fields(), &values)?;
+        let builder = Schema::builder().with_fields(fields);
+        builder.build()
+    }
+
+    fn r#struct(&mut self, fields: &Fields, results: Vec<Self::T>) -> 
Result<Self::T> {
+        let fields = Self::convert_fields(fields, &results)?;
+        Ok(Type::Struct(StructType::new(fields)))
+    }
+
+    fn list(&mut self, list: &DataType, value: Self::T) -> Result<Self::T> {
+        let element_field = match list {
+            DataType::List(element_field) => element_field,
+            DataType::LargeList(element_field) => element_field,
+            DataType::FixedSizeList(element_field, _) => element_field,
+            _ => {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    "List type must have list data type",
+                ))
+            }
+        };
+
+        let id = get_field_id(element_field)?;
+        let doc = get_field_doc(element_field);
+        let element_field = Arc::new(NestedField {
+            id,
+            doc,
+            name: "element".to_string(),
+            required: !element_field.is_nullable(),
+            field_type: Box::new(value.clone()),
+            initial_default: None,
+            write_default: None,
+        });
+        Ok(Type::List(ListType { element_field }))
+    }
+
+    fn map(&mut self, map: &DataType, key_value: Self::T, value: Self::T) -> 
Result<Self::T> {
+        match map {
+            DataType::Map(field, _) => match field.data_type() {
+                DataType::Struct(fields) => {
+                    if fields.len() != 2 {
+                        return Err(Error::new(
+                            ErrorKind::DataInvalid,
+                            "Map field must have exactly 2 fields",
+                        ));
+                    }
+
+                    let key_field = &fields[0];
+                    let value_field = &fields[1];
+
+                    let key_id = get_field_id(key_field)?;
+                    let key_doc = get_field_doc(key_field);
+                    let key_field = Arc::new(NestedField {

Review Comment:
   Using `NestedField::map_key_element` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to