This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a776837c94 Implement ArrowSchema to AvroSchema conversion logic in 
arrow-avro (#8075)
a776837c94 is described below

commit a776837c94a672bebab9dab466c155cc14c98436
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Aug 12 12:18:49 2025 -0500

    Implement ArrowSchema to AvroSchema conversion logic in arrow-avro (#8075)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/4886
    
    # Rationale for this change
    
    This change introduces functionality to convert an `ArrowSchema` into an
    `AvroSchema`. This is a crucial feature for improving interoperability
    between Arrow and Avro. By enabling direct schema conversion, we
    simplify schema evolution support by creating `AvroSchema` instances
    directly from an arrow-rs `Schema`. Additionally, these updates are
    foundational for the upcoming `arrow-avro` `Writer`.
    
    # What changes are included in this PR?
    
    - **`TryFrom<&ArrowSchema> for AvroSchema`**: The core of this PR is the
    implementation of the `TryFrom` trait to allow a fallible conversion
    from an `ArrowSchema` reference to a new `AvroSchema`.
    - **Type Mapping Logic**: Added comprehensive logic to map Arrow
    `DataType` variants to their corresponding Avro type representations.
    This includes:
        - Primitive types (`Boolean`, `Int`, `Float`, `Binary`, `Utf8`).
        - Logical types (e.g., `Timestamp`, `Date`, `Decimal`, `UUID`).
    - Complex types (`Struct`, `List`, `Map`, `Dictionary`). Dictionaries
    are converted to Avro `enum` types.
    - **Name Sanitization**: Implemented a `NameGenerator` to ensure that
    field names derived from the `ArrowSchema` are valid according to Avro
    naming conventions and are unique within their scope.
    - **Metadata Handling**: The conversion preserves relevant metadata from
    the Arrow schema.
    - `arrow-avro` metadata constants to simplify working with Avro metadata
    in Arrow DataTypes.
    
    # Are these changes tested?
    
    Yes, this change is accompanied by new tests in `schema.rs`. The tests
    cover:
    - Correct mapping of all supported primitive, temporal, and logical
    types.
    - Conversion of complex and nested structures like `Struct`, `List`, and
    `Map`.
    - Proper handling of dictionary-encoded fields to Avro enums.
    - Validation of name sanitization logic.
    - Round-trip conversion tests for various data types to ensure
    correctness.
    
    # Are there any user-facing changes?
    
    N/A
---
 arrow-avro/src/codec.rs      |   9 +-
 arrow-avro/src/reader/mod.rs |   8 +-
 arrow-avro/src/schema.rs     | 655 ++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 660 insertions(+), 12 deletions(-)

diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index dcd3984501..a10e3a238d 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -15,7 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::schema::{Attributes, AvroSchema, ComplexType, PrimitiveType, 
Record, Schema, TypeName};
+use crate::schema::{
+    Attributes, AvroSchema, ComplexType, PrimitiveType, Record, Schema, 
TypeName,
+    AVRO_ENUM_SYMBOLS_METADATA_KEY,
+};
 use arrow_schema::{
     ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, 
DECIMAL128_MAX_PRECISION,
     DECIMAL128_MAX_SCALE,
@@ -623,7 +626,7 @@ fn make_data_type<'a>(
                 let symbols_json = 
serde_json::to_string(&e.symbols).map_err(|e| {
                     ArrowError::ParseError(format!("Failed to serialize enum 
symbols: {e}"))
                 })?;
-                metadata.insert("avro.enum.symbols".to_string(), symbols_json);
+                metadata.insert(AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(), 
symbols_json);
                 let field = AvroDataType {
                     nullability: None,
                     metadata,
@@ -780,11 +783,9 @@ mod tests {
     #[test]
     fn test_uuid_type() {
         let mut codec = Codec::Fixed(16);
-
         if let c @ Codec::Fixed(16) = &mut codec {
             *c = Codec::Uuid;
         }
-
         assert!(matches!(codec, Codec::Uuid));
     }
 
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index e9bf7af61e..1f741d6d53 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -595,7 +595,7 @@ mod test {
     use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
     use crate::schema::{
         AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType, Schema 
as AvroRaw,
-        SchemaStore, SINGLE_OBJECT_MAGIC,
+        SchemaStore, AVRO_ENUM_SYMBOLS_METADATA_KEY, SINGLE_OBJECT_MAGIC,
     };
     use crate::test_util::arrow_test_data;
     use arrow::array::ArrayDataBuilder;
@@ -1420,19 +1420,19 @@ mod test {
                 DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
             let mut md_f1 = HashMap::new();
             md_f1.insert(
-                "avro.enum.symbols".to_string(),
+                AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(),
                 r#"["a","b","c","d"]"#.to_string(),
             );
             let f1_field = Field::new("f1", dict_type.clone(), 
false).with_metadata(md_f1);
             let mut md_f2 = HashMap::new();
             md_f2.insert(
-                "avro.enum.symbols".to_string(),
+                AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(),
                 r#"["e","f","g","h"]"#.to_string(),
             );
             let f2_field = Field::new("f2", dict_type.clone(), 
false).with_metadata(md_f2);
             let mut md_f3 = HashMap::new();
             md_f3.insert(
-                "avro.enum.symbols".to_string(),
+                AVRO_ENUM_SYMBOLS_METADATA_KEY.to_string(),
                 r#"["i","j","k"]"#.to_string(),
             );
             let f3_field = Field::new("f3", dict_type.clone(), 
true).with_metadata(md_f3);
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index 539e7b02f3..2f1c0a2bcf 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -15,12 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow_schema::ArrowError;
+use arrow_schema::{
+    ArrowError, DataType, Field as ArrowField, IntervalUnit, Schema as 
ArrowSchema, TimeUnit,
+};
 use serde::{Deserialize, Serialize};
-use serde_json::{json, Value};
+use serde_json::{json, Map as JsonMap, Value};
 use std::cmp::PartialEq;
 use std::collections::hash_map::Entry;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use strum_macros::AsRefStr;
 
 /// The metadata key used for storing the JSON encoded [`Schema`]
@@ -29,6 +31,21 @@ pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
 /// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
 pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
 
+/// Metadata key used to represent Avro enum symbols in an Arrow schema.
+pub const AVRO_ENUM_SYMBOLS_METADATA_KEY: &str = "avro.enum.symbols";
+
+/// Metadata key used to store the default value of a field in an Avro schema.
+pub const AVRO_FIELD_DEFAULT_METADATA_KEY: &str = "avro.field.default";
+
+/// Metadata key used to store the name of a type in an Avro schema.
+pub const AVRO_NAME_METADATA_KEY: &str = "avro.name";
+
+/// Metadata key used to store the name of a type in an Avro schema.
+pub const AVRO_NAMESPACE_METADATA_KEY: &str = "avro.namespace";
+
+/// Metadata key used to store the documentation for a type in an Avro schema.
+pub const AVRO_DOC_METADATA_KEY: &str = "avro.doc";
+
 /// Compare two Avro schemas for equality (identical schemas).
 /// Returns true if the schemas have the same parsing canonical form (i.e., 
logically identical).
 pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result<bool, 
ArrowError> {
@@ -284,6 +301,57 @@ pub struct AvroSchema {
     pub json_string: String,
 }
 
+impl TryFrom<&ArrowSchema> for AvroSchema {
+    type Error = ArrowError;
+
+    fn try_from(schema: &ArrowSchema) -> Result<Self, Self::Error> {
+        // Fast‑path: schema already contains Avro JSON
+        if let Some(json) = schema.metadata.get(SCHEMA_METADATA_KEY) {
+            return Ok(AvroSchema::new(json.clone()));
+        }
+        let mut name_gen = NameGenerator::default();
+        let fields_json = schema
+            .fields()
+            .iter()
+            .map(|f| arrow_field_to_avro(f, &mut name_gen))
+            .collect::<Result<Vec<_>, _>>()?;
+        // Assemble top‑level record
+        let record_name = schema
+            .metadata
+            .get(AVRO_NAME_METADATA_KEY)
+            .map_or("topLevelRecord", |s| s.as_str());
+        let mut record = JsonMap::with_capacity(schema.metadata.len() + 4);
+        record.insert("type".into(), Value::String("record".into()));
+        record.insert(
+            "name".into(),
+            Value::String(sanitise_avro_name(record_name)),
+        );
+        if let Some(ns) = schema.metadata.get(AVRO_NAMESPACE_METADATA_KEY) {
+            record.insert("namespace".into(), Value::String(ns.clone()));
+        }
+        if let Some(doc) = schema.metadata.get(AVRO_DOC_METADATA_KEY) {
+            record.insert("doc".into(), Value::String(doc.clone()));
+        }
+        record.insert("fields".into(), Value::Array(fields_json));
+        let schema_prefix = format!("{SCHEMA_METADATA_KEY}.");
+        for (meta_key, meta_val) in &schema.metadata {
+            // Skip keys already handled or internal
+            if meta_key.starts_with("avro.")
+                || meta_key.starts_with(schema_prefix.as_str())
+                || is_internal_arrow_key(meta_key)
+            {
+                continue;
+            }
+            let json_val =
+                serde_json::from_str(meta_val).unwrap_or_else(|_| 
Value::String(meta_val.clone()));
+            record.insert(meta_key.clone(), json_val);
+        }
+        let json_string = serde_json::to_string(&Value::Object(record))
+            .map_err(|e| ArrowError::SchemaError(format!("Serialising Avro 
JSON failed: {e}")))?;
+        Ok(AvroSchema::new(json_string))
+    }
+}
+
 impl AvroSchema {
     /// Creates a new `AvroSchema` from a JSON string.
     pub fn new(json_string: String) -> Self {
@@ -647,12 +715,336 @@ pub(crate) fn compute_fingerprint_rabin(canonical_form: 
&str) -> u64 {
     fp
 }
 
+#[inline]
+fn is_internal_arrow_key(key: &str) -> bool {
+    key.starts_with("ARROW:") || key == SCHEMA_METADATA_KEY
+}
+
+// Sanitize an arbitrary string so it is a valid Avro field or type name
+fn sanitise_avro_name(base_name: &str) -> String {
+    if base_name.is_empty() {
+        return "_".to_owned();
+    }
+    let mut out: String = base_name
+        .chars()
+        .map(|char| {
+            if char.is_ascii_alphanumeric() || char == '_' {
+                char
+            } else {
+                '_'
+            }
+        })
+        .collect();
+    if out.as_bytes()[0].is_ascii_digit() {
+        out.insert(0, '_');
+    }
+    out
+}
+
+#[derive(Default)]
+struct NameGenerator {
+    used: HashSet<String>,
+    counters: HashMap<String, usize>,
+}
+
+impl NameGenerator {
+    fn make_unique(&mut self, field_name: &str) -> String {
+        let field_name = sanitise_avro_name(field_name);
+        if self.used.insert(field_name.clone()) {
+            self.counters.insert(field_name.clone(), 1);
+            return field_name;
+        }
+        let counter = self.counters.entry(field_name.clone()).or_insert(1);
+        loop {
+            let candidate = format!("{field_name}_{}", *counter);
+            if self.used.insert(candidate.clone()) {
+                return candidate;
+            }
+            *counter += 1;
+        }
+    }
+}
+
+fn merge_extras(schema: Value, mut extras: JsonMap<String, Value>) -> Value {
+    if extras.is_empty() {
+        return schema;
+    }
+    match schema {
+        Value::Object(mut map) => {
+            map.extend(extras);
+            Value::Object(map)
+        }
+        Value::Array(mut union) => {
+            if let Some(non_null) = union.iter_mut().find(|val| val.as_str() 
!= Some("null")) {
+                let original = std::mem::take(non_null);
+                *non_null = merge_extras(original, extras);
+            }
+            Value::Array(union)
+        }
+        primitive => {
+            let mut map = JsonMap::with_capacity(extras.len() + 1);
+            map.insert("type".into(), primitive);
+            map.extend(extras);
+            Value::Object(map)
+        }
+    }
+}
+
+// Convert an Arrow `DataType` into an Avro schema `Value`.
+fn datatype_to_avro(
+    dt: &DataType,
+    field_name: &str,
+    metadata: &HashMap<String, String>,
+    name_gen: &mut NameGenerator,
+) -> Result<(Value, JsonMap<String, Value>), ArrowError> {
+    let mut extras = JsonMap::new();
+    let val = match dt {
+        DataType::Null => Value::String("null".into()),
+        DataType::Boolean => Value::String("boolean".into()),
+        DataType::Int8 | DataType::Int16 | DataType::UInt8 | DataType::UInt16 
| DataType::Int32 => {
+            Value::String("int".into())
+        }
+        DataType::UInt32 | DataType::Int64 | DataType::UInt64 => 
Value::String("long".into()),
+        DataType::Float16 | DataType::Float32 => Value::String("float".into()),
+        DataType::Float64 => Value::String("double".into()),
+        DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View => 
Value::String("string".into()),
+        DataType::Binary | DataType::LargeBinary => 
Value::String("bytes".into()),
+        DataType::FixedSizeBinary(len) => {
+            let is_uuid = metadata
+                .get("logicalType")
+                .is_some_and(|value| value == "uuid")
+                || (*len == 16
+                    && metadata
+                        .get("ARROW:extension:name")
+                        .is_some_and(|value| value == "uuid"));
+            if is_uuid {
+                json!({ "type": "string", "logicalType": "uuid" })
+            } else {
+                json!({
+                    "type": "fixed",
+                    "name": name_gen.make_unique(field_name),
+                    "size": len
+                })
+            }
+        }
+        DataType::Decimal128(precision, scale) | 
DataType::Decimal256(precision, scale) => {
+            // Prefer fixed if original size info present
+            let mut meta = JsonMap::from_iter([
+                ("logicalType".into(), json!("decimal")),
+                ("precision".into(), json!(*precision)),
+                ("scale".into(), json!(*scale)),
+            ]);
+            if let Some(size) = metadata
+                .get("size")
+                .and_then(|val| val.parse::<usize>().ok())
+            {
+                meta.insert("type".into(), json!("fixed"));
+                meta.insert("size".into(), json!(size));
+                meta.insert("name".into(), 
json!(name_gen.make_unique(field_name)));
+            } else {
+                meta.insert("type".into(), json!("bytes"));
+            }
+            Value::Object(meta)
+        }
+        DataType::Date32 => json!({ "type": "int", "logicalType": "date" }),
+        DataType::Date64 => json!({ "type": "long", "logicalType": 
"local-timestamp-millis" }),
+        DataType::Time32(unit) => match unit {
+            TimeUnit::Millisecond => json!({ "type": "int", "logicalType": 
"time-millis" }),
+            TimeUnit::Second => {
+                extras.insert("arrowTimeUnit".into(), 
Value::String("second".into()));
+                Value::String("int".into())
+            }
+            _ => Value::String("int".into()),
+        },
+        DataType::Time64(unit) => match unit {
+            TimeUnit::Microsecond => json!({ "type": "long", "logicalType": 
"time-micros" }),
+            TimeUnit::Nanosecond => {
+                extras.insert("arrowTimeUnit".into(), 
Value::String("nanosecond".into()));
+                Value::String("long".into())
+            }
+            _ => Value::String("long".into()),
+        },
+        DataType::Timestamp(unit, tz) => {
+            let logical_type = match (unit, tz.is_some()) {
+                (TimeUnit::Millisecond, true) => "timestamp-millis",
+                (TimeUnit::Millisecond, false) => "local-timestamp-millis",
+                (TimeUnit::Microsecond, true) => "timestamp-micros",
+                (TimeUnit::Microsecond, false) => "local-timestamp-micros",
+                (TimeUnit::Second, _) => {
+                    extras.insert("arrowTimeUnit".into(), 
Value::String("second".into()));
+                    return Ok((Value::String("long".into()), extras));
+                }
+                (TimeUnit::Nanosecond, _) => {
+                    extras.insert("arrowTimeUnit".into(), 
Value::String("nanosecond".into()));
+                    return Ok((Value::String("long".into()), extras));
+                }
+            };
+            json!({ "type": "long", "logicalType": logical_type })
+        }
+        DataType::Duration(unit) => {
+            extras.insert(
+                "arrowDurationUnit".into(),
+                Value::String(format!("{unit:?}").to_lowercase()),
+            );
+            Value::String("long".into())
+        }
+        DataType::Interval(IntervalUnit::MonthDayNano) => json!({
+            "type": "fixed",
+            "name": name_gen.make_unique(&format!("{field_name}_duration")),
+            "size": 12,
+            "logicalType": "duration"
+        }),
+        DataType::Interval(IntervalUnit::YearMonth) => {
+            extras.insert(
+                "arrowIntervalUnit".into(),
+                Value::String("yearmonth".into()),
+            );
+            Value::String("long".into())
+        }
+        DataType::Interval(IntervalUnit::DayTime) => {
+            extras.insert("arrowIntervalUnit".into(), 
Value::String("daytime".into()));
+            Value::String("long".into())
+        }
+        DataType::List(child) | DataType::LargeList(child) => {
+            if matches!(dt, DataType::LargeList(_)) {
+                extras.insert("arrowLargeList".into(), Value::Bool(true));
+            }
+            let (items, ie) =
+                datatype_to_avro(child.data_type(), child.name(), 
child.metadata(), name_gen)?;
+            json!({
+                "type": "array",
+                "items": merge_extras(items, ie)
+            })
+        }
+        DataType::FixedSizeList(child, len) => {
+            extras.insert("arrowFixedSize".into(), json!(len));
+            let (items, ie) =
+                datatype_to_avro(child.data_type(), child.name(), 
child.metadata(), name_gen)?;
+            json!({
+                "type": "array",
+                "items": merge_extras(items, ie)
+            })
+        }
+        DataType::Map(entries, _) => {
+            let value_field = match entries.data_type() {
+                DataType::Struct(fs) => &fs[1],
+                _ => {
+                    return Err(ArrowError::SchemaError(
+                        "Map 'entries' field must be Struct(key,value)".into(),
+                    ))
+                }
+            };
+            let (val_schema, value_entry) = datatype_to_avro(
+                value_field.data_type(),
+                value_field.name(),
+                value_field.metadata(),
+                name_gen,
+            )?;
+            json!({
+                "type": "map",
+                "values": merge_extras(val_schema, value_entry)
+            })
+        }
+        DataType::Struct(fields) => {
+            let avro_fields = fields
+                .iter()
+                .map(|field| arrow_field_to_avro(field, name_gen))
+                .collect::<Result<Vec<_>, _>>()?;
+            json!({
+                "type": "record",
+                "name": name_gen.make_unique(field_name),
+                "fields": avro_fields
+            })
+        }
+        DataType::Dictionary(_, value) => {
+            if let Some(j) = metadata.get(AVRO_ENUM_SYMBOLS_METADATA_KEY) {
+                let symbols: Vec<&str> =
+                    serde_json::from_str(j).map_err(|e| 
ArrowError::ParseError(e.to_string()))?;
+                json!({
+                    "type": "enum",
+                    "name": name_gen.make_unique(field_name),
+                    "symbols": symbols
+                })
+            } else {
+                let (inner, ie) = datatype_to_avro(value.as_ref(), field_name, 
metadata, name_gen)?;
+                merge_extras(inner, ie)
+            }
+        }
+        DataType::RunEndEncoded(_, values) => {
+            let (inner, ie) = datatype_to_avro(
+                values.data_type(),
+                values.name(),
+                values.metadata(),
+                name_gen,
+            )?;
+            merge_extras(inner, ie)
+        }
+        DataType::Union(_, _) => {
+            return Err(ArrowError::NotYetImplemented(
+                "Arrow Union to Avro Union not yet supported".into(),
+            ))
+        }
+        other => {
+            return Err(ArrowError::NotYetImplemented(format!(
+                "Arrow type {other:?} has no Avro representation"
+            )))
+        }
+    };
+    Ok((val, extras))
+}
+
+fn arrow_field_to_avro(
+    field: &ArrowField,
+    name_gen: &mut NameGenerator,
+) -> Result<Value, ArrowError> {
+    // Sanitize field name to ensure Avro validity but store the original in 
metadata
+    let avro_name = sanitise_avro_name(field.name());
+    let (schema, extras) =
+        datatype_to_avro(field.data_type(), &avro_name, field.metadata(), 
name_gen)?;
+    // If nullable, wrap `[ "null", <type> ]`, NOTE: second order nullability 
to be added in a follow-up
+    let mut schema = if field.is_nullable() {
+        Value::Array(vec![
+            Value::String("null".into()),
+            merge_extras(schema, extras),
+        ])
+    } else {
+        merge_extras(schema, extras)
+    };
+    // Build the field map
+    let mut map = JsonMap::with_capacity(field.metadata().len() + 3);
+    map.insert("name".into(), Value::String(avro_name));
+    map.insert("type".into(), schema);
+    // Transfer selected metadata
+    for (meta_key, meta_val) in field.metadata() {
+        if is_internal_arrow_key(meta_key) {
+            continue;
+        }
+        match meta_key.as_str() {
+            AVRO_DOC_METADATA_KEY => {
+                map.insert("doc".into(), Value::String(meta_val.clone()));
+            }
+            AVRO_FIELD_DEFAULT_METADATA_KEY => {
+                let default_value = serde_json::from_str(meta_val)
+                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
+                map.insert("default".into(), default_value);
+            }
+            _ => {
+                let json_val = serde_json::from_str(meta_val)
+                    .unwrap_or_else(|_| Value::String(meta_val.clone()));
+                map.insert(meta_key.clone(), json_val);
+            }
+        }
+    }
+    Ok(Value::Object(map))
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::codec::{AvroDataType, AvroField};
-    use arrow_schema::{DataType, Fields, TimeUnit};
+    use arrow_schema::{DataType, Fields, SchemaBuilder, TimeUnit};
     use serde_json::json;
+    use std::sync::Arc;
 
     fn int_schema() -> Schema<'static> {
         Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
@@ -682,6 +1074,19 @@ mod tests {
         }))
     }
 
+    fn single_field_schema(field: ArrowField) -> arrow_schema::Schema {
+        let mut sb = SchemaBuilder::new();
+        sb.push(field);
+        sb.finish()
+    }
+
+    fn assert_json_contains(avro_json: &str, needle: &str) {
+        assert!(
+            avro_json.contains(needle),
+            "JSON did not contain `{needle}` : {avro_json}"
+        )
+    }
+
     #[test]
     fn test_deserialize() {
         let t: Schema = serde_json::from_str("\"string\"").unwrap();
@@ -1120,4 +1525,246 @@ mod tests {
         let canonical_form = 
generate_canonical_form(&schema_with_attrs).unwrap();
         assert_eq!(canonical_form, expected_canonical_form);
     }
+
+    #[test]
+    fn test_primitive_mappings() {
+        let cases = vec![
+            (DataType::Boolean, "\"boolean\""),
+            (DataType::Int8, "\"int\""),
+            (DataType::Int16, "\"int\""),
+            (DataType::Int32, "\"int\""),
+            (DataType::Int64, "\"long\""),
+            (DataType::UInt8, "\"int\""),
+            (DataType::UInt16, "\"int\""),
+            (DataType::UInt32, "\"long\""),
+            (DataType::UInt64, "\"long\""),
+            (DataType::Float16, "\"float\""),
+            (DataType::Float32, "\"float\""),
+            (DataType::Float64, "\"double\""),
+            (DataType::Utf8, "\"string\""),
+            (DataType::Binary, "\"bytes\""),
+        ];
+        for (dt, avro_token) in cases {
+            let field = ArrowField::new("col", dt.clone(), false);
+            let arrow_schema = single_field_schema(field);
+            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+            assert_json_contains(&avro.json_string, avro_token);
+        }
+    }
+
+    #[test]
+    fn test_temporal_mappings() {
+        let cases = vec![
+            (DataType::Date32, "\"logicalType\":\"date\""),
+            (
+                DataType::Time32(TimeUnit::Millisecond),
+                "\"logicalType\":\"time-millis\"",
+            ),
+            (
+                DataType::Time64(TimeUnit::Microsecond),
+                "\"logicalType\":\"time-micros\"",
+            ),
+            (
+                DataType::Timestamp(TimeUnit::Millisecond, None),
+                "\"logicalType\":\"local-timestamp-millis\"",
+            ),
+            (
+                DataType::Timestamp(TimeUnit::Microsecond, 
Some("+00:00".into())),
+                "\"logicalType\":\"timestamp-micros\"",
+            ),
+        ];
+        for (dt, needle) in cases {
+            let field = ArrowField::new("ts", dt.clone(), true);
+            let arrow_schema = single_field_schema(field);
+            let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+            assert_json_contains(&avro.json_string, needle);
+        }
+    }
+
+    #[test]
+    fn test_decimal_and_uuid() {
+        let decimal_field = ArrowField::new("amount", DataType::Decimal128(25, 
2), false);
+        let dec_schema = single_field_schema(decimal_field);
+        let avro_dec = AvroSchema::try_from(&dec_schema).unwrap();
+        assert_json_contains(&avro_dec.json_string, 
"\"logicalType\":\"decimal\"");
+        assert_json_contains(&avro_dec.json_string, "\"precision\":25");
+        assert_json_contains(&avro_dec.json_string, "\"scale\":2");
+        let mut md = HashMap::new();
+        md.insert("logicalType".into(), "uuid".into());
+        let uuid_field =
+            ArrowField::new("id", DataType::FixedSizeBinary(16), 
false).with_metadata(md);
+        let uuid_schema = single_field_schema(uuid_field);
+        let avro_uuid = AvroSchema::try_from(&uuid_schema).unwrap();
+        assert_json_contains(&avro_uuid.json_string, 
"\"logicalType\":\"uuid\"");
+    }
+
+    #[test]
+    fn test_interval_duration() {
+        let interval_field = ArrowField::new(
+            "span",
+            DataType::Interval(IntervalUnit::MonthDayNano),
+            false,
+        );
+        let s = single_field_schema(interval_field);
+        let avro = AvroSchema::try_from(&s).unwrap();
+        assert_json_contains(&avro.json_string, 
"\"logicalType\":\"duration\"");
+        assert_json_contains(&avro.json_string, "\"size\":12");
+        let dur_field = ArrowField::new("latency", 
DataType::Duration(TimeUnit::Nanosecond), false);
+        let s2 = single_field_schema(dur_field);
+        let avro2 = AvroSchema::try_from(&s2).unwrap();
+        assert_json_contains(&avro2.json_string, "\"arrowDurationUnit\"");
+    }
+
+    #[test]
+    fn test_complex_types() {
+        let list_dt = DataType::List(Arc::new(ArrowField::new("item", 
DataType::Int32, true)));
+        let list_schema = single_field_schema(ArrowField::new("numbers", 
list_dt, false));
+        let avro_list = AvroSchema::try_from(&list_schema).unwrap();
+        assert_json_contains(&avro_list.json_string, "\"type\":\"array\"");
+        assert_json_contains(&avro_list.json_string, "\"items\"");
+        let value_field = ArrowField::new("value", DataType::Boolean, true);
+        let entries_struct = ArrowField::new(
+            "entries",
+            DataType::Struct(Fields::from(vec![
+                ArrowField::new("key", DataType::Utf8, false),
+                value_field.clone(),
+            ])),
+            false,
+        );
+        let map_dt = DataType::Map(Arc::new(entries_struct), false);
+        let map_schema = single_field_schema(ArrowField::new("props", map_dt, 
false));
+        let avro_map = AvroSchema::try_from(&map_schema).unwrap();
+        assert_json_contains(&avro_map.json_string, "\"type\":\"map\"");
+        assert_json_contains(&avro_map.json_string, "\"values\"");
+        let struct_dt = DataType::Struct(Fields::from(vec![
+            ArrowField::new("f1", DataType::Int64, false),
+            ArrowField::new("f2", DataType::Utf8, true),
+        ]));
+        let struct_schema = single_field_schema(ArrowField::new("person", 
struct_dt, true));
+        let avro_struct = AvroSchema::try_from(&struct_schema).unwrap();
+        assert_json_contains(&avro_struct.json_string, "\"type\":\"record\"");
+        assert_json_contains(&avro_struct.json_string, "\"null\"");
+    }
+
+    #[test]
+    fn test_enum_dictionary() {
+        let mut md = HashMap::new();
+        md.insert(
+            AVRO_ENUM_SYMBOLS_METADATA_KEY.into(),
+            "[\"OPEN\",\"CLOSED\"]".into(),
+        );
+        let enum_dt = DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8));
+        let field = ArrowField::new("status", enum_dt, 
false).with_metadata(md);
+        let schema = single_field_schema(field);
+        let avro = AvroSchema::try_from(&schema).unwrap();
+        assert_json_contains(&avro.json_string, "\"type\":\"enum\"");
+        assert_json_contains(&avro.json_string, 
"\"symbols\":[\"OPEN\",\"CLOSED\"]");
+    }
+
+    #[test]
+    fn test_run_end_encoded() {
+        let ree_dt = DataType::RunEndEncoded(
+            Arc::new(ArrowField::new("run_ends", DataType::Int32, false)),
+            Arc::new(ArrowField::new("values", DataType::Utf8, false)),
+        );
+        let s = single_field_schema(ArrowField::new("text", ree_dt, false));
+        let avro = AvroSchema::try_from(&s).unwrap();
+        assert_json_contains(&avro.json_string, "\"string\"");
+    }
+
+    #[test]
+    fn test_dense_union_error() {
+        use arrow_schema::UnionFields;
+        let uf: UnionFields = vec![(0i8, Arc::new(ArrowField::new("a", 
DataType::Int32, false)))]
+            .into_iter()
+            .collect();
+        let union_dt = DataType::Union(uf, arrow_schema::UnionMode::Dense);
+        let s = single_field_schema(ArrowField::new("u", union_dt, false));
+        let err = AvroSchema::try_from(&s).unwrap_err();
+        assert!(err
+            .to_string()
+            .contains("Arrow Union to Avro Union not yet supported"));
+    }
+
+    #[test]
+    fn round_trip_primitive() {
+        let arrow_schema = ArrowSchema::new(vec![ArrowField::new("f1", 
DataType::Int32, false)]);
+        let avro_schema = AvroSchema::try_from(&arrow_schema).unwrap();
+        let decoded = avro_schema.schema().unwrap();
+        assert!(matches!(decoded, Schema::Complex(_)));
+    }
+
+    #[test]
+    fn test_name_generator_sanitization_and_uniqueness() {
+        let f1 = ArrowField::new("weird-name", DataType::FixedSizeBinary(8), 
false);
+        let f2 = ArrowField::new("weird name", DataType::FixedSizeBinary(8), 
false);
+        let f3 = ArrowField::new("123bad", DataType::FixedSizeBinary(8), 
false);
+        let arrow_schema = ArrowSchema::new(vec![f1, f2, f3]);
+        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+        assert_json_contains(&avro.json_string, "\"name\":\"weird_name\"");
+        assert_json_contains(&avro.json_string, "\"name\":\"weird_name_1\"");
+        assert_json_contains(&avro.json_string, "\"name\":\"_123bad\"");
+    }
+
+    #[test]
+    fn test_date64_logical_type_mapping() {
+        let field = ArrowField::new("d", DataType::Date64, true);
+        let schema = single_field_schema(field);
+        let avro = AvroSchema::try_from(&schema).unwrap();
+        assert_json_contains(
+            &avro.json_string,
+            "\"logicalType\":\"local-timestamp-millis\"",
+        );
+    }
+
+    #[test]
+    fn test_duration_list_extras_propagated() {
+        let child = ArrowField::new("lat", 
DataType::Duration(TimeUnit::Microsecond), false);
+        let list_dt = DataType::List(Arc::new(child));
+        let arrow_schema = single_field_schema(ArrowField::new("durations", 
list_dt, false));
+        let avro = AvroSchema::try_from(&arrow_schema).unwrap();
+        assert_json_contains(&avro.json_string, 
"\"arrowDurationUnit\":\"microsecond\"");
+    }
+
+    #[test]
+    fn test_interval_yearmonth_extra() {
+        let field = ArrowField::new("iv", 
DataType::Interval(IntervalUnit::YearMonth), false);
+        let schema = single_field_schema(field);
+        let avro = AvroSchema::try_from(&schema).unwrap();
+        assert_json_contains(&avro.json_string, 
"\"arrowIntervalUnit\":\"yearmonth\"");
+    }
+
+    #[test]
+    fn test_interval_daytime_extra() {
+        let field = ArrowField::new("iv_dt", 
DataType::Interval(IntervalUnit::DayTime), false);
+        let schema = single_field_schema(field);
+        let avro = AvroSchema::try_from(&schema).unwrap();
+        assert_json_contains(&avro.json_string, 
"\"arrowIntervalUnit\":\"daytime\"");
+    }
+
+    #[test]
+    fn test_fixed_size_list_extra() {
+        let child = ArrowField::new("item", DataType::Int32, false);
+        let dt = DataType::FixedSizeList(Arc::new(child), 3);
+        let schema = single_field_schema(ArrowField::new("triples", dt, 
false));
+        let avro = AvroSchema::try_from(&schema).unwrap();
+        assert_json_contains(&avro.json_string, "\"arrowFixedSize\":3");
+    }
+
+    #[test]
+    fn test_map_duration_value_extra() {
+        let val_field = ArrowField::new("value", 
DataType::Duration(TimeUnit::Second), true);
+        let entries_struct = ArrowField::new(
+            "entries",
+            DataType::Struct(Fields::from(vec![
+                ArrowField::new("key", DataType::Utf8, false),
+                val_field,
+            ])),
+            false,
+        );
+        let map_dt = DataType::Map(Arc::new(entries_struct), false);
+        let schema = single_field_schema(ArrowField::new("metrics", map_dt, 
false));
+        let avro = AvroSchema::try_from(&schema).unwrap();
+        assert_json_contains(&avro.json_string, 
"\"arrowDurationUnit\":\"second\"");
+    }
 }


Reply via email to