This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch extract-schema-parser-to-its-own-module
in repository https://gitbox.apache.org/repos/asf/avro-rs.git

commit 971bac9e0871c48205ec17f1df5f6f963ea9a3cc
Author: Martin Tzvetanov Grigorov <[email protected]>
AuthorDate: Tue Jan 20 09:47:01 2026 +0200

    chore: Extract Parser to its own module (src/schema/parser.rs)
    
    Minor internal API changes to fix the build.
    No API breaks!
---
 avro/src/schema/mod.rs          | 842 +-------------------------------------
 avro/src/schema/parser.rs       | 881 ++++++++++++++++++++++++++++++++++++++++
 avro/src/schema/record/field.rs |   2 +-
 3 files changed, 892 insertions(+), 833 deletions(-)

diff --git a/avro/src/schema/mod.rs b/avro/src/schema/mod.rs
index 42a0b80..d915f46 100644
--- a/avro/src/schema/mod.rs
+++ b/avro/src/schema/mod.rs
@@ -21,10 +21,9 @@ use crate::{
     error::{Details, Error},
     schema_equality, types,
     util::MapHelper,
-    validator::{validate_enum_symbol_name, validate_namespace, 
validate_schema_name},
+    validator::{validate_namespace, validate_schema_name},
 };
 use digest::Digest;
-use log::{debug, error, warn};
 use serde::{
     Deserialize, Serialize, Serializer,
     ser::{SerializeMap, SerializeSeq},
@@ -47,6 +46,8 @@ pub use record::{
 };
 mod union;
 pub use union::UnionSchema;
+mod parser;
+use parser::Parser;
 
 /// Represents an Avro schema fingerprint
 /// More information about Avro schema fingerprints can be found in the
@@ -780,37 +781,6 @@ type DecimalMetadata = usize;
 pub(crate) type Precision = DecimalMetadata;
 pub(crate) type Scale = DecimalMetadata;
 
-fn parse_json_integer_for_decimal(value: &serde_json::Number) -> 
Result<DecimalMetadata, Error> {
-    Ok(if value.is_u64() {
-        let num = value
-            .as_u64()
-            .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
-        num.try_into()
-            .map_err(|e| Details::ConvertU64ToUsize(e, num))?
-    } else if value.is_i64() {
-        let num = value
-            .as_i64()
-            .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
-        num.try_into()
-            .map_err(|e| Details::ConvertI64ToUsize(e, num))?
-    } else {
-        return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
-    })
-}
-
-#[derive(Default)]
-pub(crate) struct Parser {
-    input_schemas: HashMap<Name, Value>,
-    /// A map of name -> Schema::Ref
-    /// Used to resolve cyclic references, i.e. when a
-    /// field's type is a reference to its record's type
-    resolving_schemas: Names,
-    input_order: Vec<Name>,
-    /// A map of name -> fully parsed Schema
-    /// Used to avoid parsing the same schema twice
-    parsed_schemas: Names,
-}
-
 impl Schema {
     /// Converts `self` into its [Parsing Canonical Form].
     ///
@@ -880,12 +850,11 @@ impl Schema {
                 return Err(Details::GetNameField.into());
             }
         }
-        let mut parser = Parser {
+        let mut parser = Parser::new(
             input_schemas,
-            resolving_schemas: HashMap::default(),
             input_order,
-            parsed_schemas: HashMap::with_capacity(input_len),
-        };
+            HashMap::with_capacity(input_len),
+        );
         parser.parse_list()
     }
 
@@ -922,12 +891,11 @@ impl Schema {
                 return Err(Details::GetNameField.into());
             }
         }
-        let mut parser = Parser {
+        let mut parser = Parser::new(
             input_schemas,
-            resolving_schemas: HashMap::default(),
             input_order,
-            parsed_schemas: HashMap::with_capacity(schemata_len),
-        };
+            HashMap::with_capacity(schemata_len),
+        );
         parser.parse_input_schemas()?;
 
         let value = 
serde_json::from_str(schema).map_err(Details::ParseSchemaJson)?;
@@ -954,12 +922,7 @@ impl Schema {
     /// Parses an Avro schema from JSON.
     /// Any `Schema::Ref`s must be known in the `names` map.
     pub(crate) fn parse_with_names(value: &Value, names: Names) -> 
AvroResult<Schema> {
-        let mut parser = Parser {
-            input_schemas: HashMap::with_capacity(1),
-            resolving_schemas: Names::default(),
-            input_order: Vec::with_capacity(1),
-            parsed_schemas: names,
-        };
+        let mut parser = Parser::new(HashMap::with_capacity(1), 
Vec::with_capacity(1), names);
         parser.parse(value, &None)
     }
 
@@ -1129,791 +1092,6 @@ impl Schema {
     }
 }
 
-impl Parser {
-    /// Create a `Schema` from a string representing a JSON Avro schema.
-    fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
-        let value = 
serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
-        self.parse(&value, &None)
-    }
-
-    /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. 
It is allowed that
-    /// the schemas have cross-dependencies; these will be resolved during 
parsing.
-    fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
-        self.parse_input_schemas()?;
-
-        let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
-        for name in self.input_order.drain(0..) {
-            let parsed = self
-                .parsed_schemas
-                .remove(&name)
-                .expect("One of the input schemas was unexpectedly not 
parsed");
-            parsed_schemas.push(parsed);
-        }
-        Ok(parsed_schemas)
-    }
-
-    /// Convert the input schemas to parsed_schemas
-    fn parse_input_schemas(&mut self) -> Result<(), Error> {
-        while !self.input_schemas.is_empty() {
-            let next_name = self
-                .input_schemas
-                .keys()
-                .next()
-                .expect("Input schemas unexpectedly empty")
-                .to_owned();
-            let (name, value) = self
-                .input_schemas
-                .remove_entry(&next_name)
-                .expect("Key unexpectedly missing");
-            let parsed = self.parse(&value, &None)?;
-            self.parsed_schemas
-                .insert(get_schema_type_name(name, value), parsed);
-        }
-        Ok(())
-    }
-
-    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
-    /// schema.
-    fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> 
AvroResult<Schema> {
-        match *value {
-            Value::String(ref t) => self.parse_known_schema(t.as_str(), 
enclosing_namespace),
-            Value::Object(ref data) => {
-                self.parse_complex(data, enclosing_namespace, 
RecordSchemaParseLocation::Root)
-            }
-            Value::Array(ref data) => self.parse_union(data, 
enclosing_namespace),
-            _ => Err(Details::ParseSchemaFromValidJson.into()),
-        }
-    }
-
-    /// Parse a `serde_json::Value` representing an Avro type whose Schema is 
known into a
-    /// `Schema`. A Schema for a `serde_json::Value` is known if it is 
primitive or has
-    /// been parsed previously by the parsed and stored in its map of 
parsed_schemas.
-    fn parse_known_schema(
-        &mut self,
-        name: &str,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        match name {
-            "null" => Ok(Schema::Null),
-            "boolean" => Ok(Schema::Boolean),
-            "int" => Ok(Schema::Int),
-            "long" => Ok(Schema::Long),
-            "double" => Ok(Schema::Double),
-            "float" => Ok(Schema::Float),
-            "bytes" => Ok(Schema::Bytes),
-            "string" => Ok(Schema::String),
-            _ => self.fetch_schema_ref(name, enclosing_namespace),
-        }
-    }
-
-    /// Given a name, tries to retrieve the parsed schema from 
`parsed_schemas`.
-    /// If a parsed schema is not found, it checks if a currently resolving
-    /// schema with that name exists.
-    /// If a resolving schema is not found, it checks if a json with that name 
exists
-    /// in `input_schemas` and then parses it (removing it from 
`input_schemas`)
-    /// and adds the parsed schema to `parsed_schemas`.
-    ///
-    /// This method allows schemas definitions that depend on other types to
-    /// parse their dependencies (or look them up if already parsed).
-    fn fetch_schema_ref(
-        &mut self,
-        name: &str,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        fn get_schema_ref(parsed: &Schema) -> Schema {
-            match parsed {
-                &Schema::Record(RecordSchema { ref name, .. })
-                | &Schema::Enum(EnumSchema { ref name, .. })
-                | &Schema::Fixed(FixedSchema { ref name, .. }) => {
-                    Schema::Ref { name: name.clone() }
-                }
-                _ => parsed.clone(),
-            }
-        }
-
-        let name = Name::new(name)?;
-        let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
-
-        if self.parsed_schemas.contains_key(&fully_qualified_name) {
-            return Ok(Schema::Ref {
-                name: fully_qualified_name,
-            });
-        }
-        if let Some(resolving_schema) = 
self.resolving_schemas.get(&fully_qualified_name) {
-            return Ok(resolving_schema.clone());
-        }
-
-        // For good error reporting we add this check
-        match name.name.as_str() {
-            "record" | "enum" | "fixed" => {
-                return 
Err(Details::InvalidSchemaRecord(name.to_string()).into());
-            }
-            _ => (),
-        }
-
-        let value = self
-            .input_schemas
-            .remove(&fully_qualified_name)
-            // TODO make a better descriptive error message here that conveys 
that a named schema cannot be found
-            .ok_or_else(|| 
Details::ParsePrimitive(fully_qualified_name.fullname(None)))?;
-
-        // parsing a full schema from inside another schema. Other full schema 
will not inherit namespace
-        let parsed = self.parse(&value, &None)?;
-        self.parsed_schemas
-            .insert(get_schema_type_name(name, value), parsed.clone());
-
-        Ok(get_schema_ref(&parsed))
-    }
-
-    fn parse_precision_and_scale(
-        complex: &Map<String, Value>,
-    ) -> Result<(Precision, Scale), Error> {
-        fn get_decimal_integer(
-            complex: &Map<String, Value>,
-            key: &'static str,
-        ) -> Result<DecimalMetadata, Error> {
-            match complex.get(key) {
-                Some(Value::Number(value)) => 
parse_json_integer_for_decimal(value),
-                None => {
-                    if key == "scale" {
-                        Ok(0)
-                    } else {
-                        Err(Details::GetDecimalMetadataFromJson(key).into())
-                    }
-                }
-                Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
-                    key: key.into(),
-                    value: value.clone(),
-                }
-                .into()),
-            }
-        }
-        let precision = get_decimal_integer(complex, "precision")?;
-        let scale = get_decimal_integer(complex, "scale")?;
-
-        if precision < 1 {
-            return Err(Details::DecimalPrecisionMuBePositive { precision 
}.into());
-        }
-
-        if precision < scale {
-            Err(Details::DecimalPrecisionLessThanScale { precision, scale 
}.into())
-        } else {
-            Ok((precision, scale))
-        }
-    }
-
-    /// Parse a `serde_json::Value` representing a complex Avro type into a
-    /// `Schema`.
-    ///
-    /// Avro supports "recursive" definition of types.
-    /// e.g: {"type": {"type": "string"}}
-    fn parse_complex(
-        &mut self,
-        complex: &Map<String, Value>,
-        enclosing_namespace: &Namespace,
-        parse_location: RecordSchemaParseLocation,
-    ) -> AvroResult<Schema> {
-        // Try to parse this as a native complex type.
-        fn parse_as_native_complex(
-            complex: &Map<String, Value>,
-            parser: &mut Parser,
-            enclosing_namespace: &Namespace,
-        ) -> AvroResult<Schema> {
-            match complex.get("type") {
-                Some(value) => match value {
-                    Value::String(s) if s == "fixed" => {
-                        parser.parse_fixed(complex, enclosing_namespace)
-                    }
-                    _ => parser.parse(value, enclosing_namespace),
-                },
-                None => Err(Details::GetLogicalTypeField.into()),
-            }
-        }
-
-        // This crate support some logical types natively, and this function 
tries to convert
-        // a native complex type with a logical type attribute to these 
logical types.
-        // This function:
-        // 1. Checks whether the native complex type is in the supported kinds.
-        // 2. If it is, using the convert function to convert the native 
complex type to
-        // a logical type.
-        fn try_convert_to_logical_type<F>(
-            logical_type: &str,
-            schema: Schema,
-            supported_schema_kinds: &[SchemaKind],
-            convert: F,
-        ) -> AvroResult<Schema>
-        where
-            F: Fn(Schema) -> AvroResult<Schema>,
-        {
-            let kind = SchemaKind::from(schema.clone());
-            if supported_schema_kinds.contains(&kind) {
-                convert(schema)
-            } else {
-                warn!(
-                    "Ignoring unknown logical type '{logical_type}' for schema 
of type: {schema:?}!"
-                );
-                Ok(schema)
-            }
-        }
-
-        match complex.get("logicalType") {
-            Some(Value::String(t)) => match t.as_str() {
-                "decimal" => {
-                    return try_convert_to_logical_type(
-                        "decimal",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Fixed, SchemaKind::Bytes],
-                        |inner| -> AvroResult<Schema> {
-                            match Self::parse_precision_and_scale(complex) {
-                                Ok((precision, scale)) => 
Ok(Schema::Decimal(DecimalSchema {
-                                    precision,
-                                    scale,
-                                    inner: inner.try_into()?,
-                                })),
-                                Err(err) => {
-                                    warn!("Ignoring invalid decimal logical 
type: {err}");
-                                    Ok(inner)
-                                }
-                            }
-                        },
-                    );
-                }
-                "big-decimal" => {
-                    return try_convert_to_logical_type(
-                        "big-decimal",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Bytes],
-                        |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
-                    );
-                }
-                "uuid" => {
-                    return try_convert_to_logical_type(
-                        "uuid",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::String, SchemaKind::Fixed, 
SchemaKind::Bytes],
-                        |schema| match schema {
-                            Schema::String => 
Ok(Schema::Uuid(UuidSchema::String)),
-                            Schema::Fixed(fixed @ FixedSchema { size: 16, .. 
}) => {
-                                Ok(Schema::Uuid(UuidSchema::Fixed(fixed)))
-                            }
-                            Schema::Fixed(FixedSchema { size, .. }) => {
-                                warn!(
-                                    "Ignoring uuid logical type for a Fixed 
schema because its size ({size:?}) is not 16! Schema: {schema:?}"
-                                );
-                                Ok(schema)
-                            }
-                            Schema::Bytes => 
Ok(Schema::Uuid(UuidSchema::Bytes)),
-                            _ => {
-                                warn!("Ignoring invalid uuid logical type for 
schema: {schema:?}");
-                                Ok(schema)
-                            }
-                        },
-                    );
-                }
-                "date" => {
-                    return try_convert_to_logical_type(
-                        "date",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Int],
-                        |_| -> AvroResult<Schema> { Ok(Schema::Date) },
-                    );
-                }
-                "time-millis" => {
-                    return try_convert_to_logical_type(
-                        "date",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Int],
-                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
-                    );
-                }
-                "time-micros" => {
-                    return try_convert_to_logical_type(
-                        "time-micros",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Long],
-                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
-                    );
-                }
-                "timestamp-millis" => {
-                    return try_convert_to_logical_type(
-                        "timestamp-millis",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Long],
-                        |_| -> AvroResult<Schema> { 
Ok(Schema::TimestampMillis) },
-                    );
-                }
-                "timestamp-micros" => {
-                    return try_convert_to_logical_type(
-                        "timestamp-micros",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Long],
-                        |_| -> AvroResult<Schema> { 
Ok(Schema::TimestampMicros) },
-                    );
-                }
-                "timestamp-nanos" => {
-                    return try_convert_to_logical_type(
-                        "timestamp-nanos",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Long],
-                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) 
},
-                    );
-                }
-                "local-timestamp-millis" => {
-                    return try_convert_to_logical_type(
-                        "local-timestamp-millis",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Long],
-                        |_| -> AvroResult<Schema> { 
Ok(Schema::LocalTimestampMillis) },
-                    );
-                }
-                "local-timestamp-micros" => {
-                    return try_convert_to_logical_type(
-                        "local-timestamp-micros",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Long],
-                        |_| -> AvroResult<Schema> { 
Ok(Schema::LocalTimestampMicros) },
-                    );
-                }
-                "local-timestamp-nanos" => {
-                    return try_convert_to_logical_type(
-                        "local-timestamp-nanos",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Long],
-                        |_| -> AvroResult<Schema> { 
Ok(Schema::LocalTimestampNanos) },
-                    );
-                }
-                "duration" => {
-                    return try_convert_to_logical_type(
-                        "duration",
-                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
-                        &[SchemaKind::Fixed],
-                        |schema| -> AvroResult<Schema> {
-                            match schema {
-                                Schema::Fixed(fixed @ FixedSchema { size: 12, 
.. }) => {
-                                    Ok(Schema::Duration(fixed))
-                                }
-                                Schema::Fixed(FixedSchema { size, .. }) => {
-                                    warn!(
-                                        "Ignoring duration logical type on 
fixed type because size ({size}) is not 12! Schema: {schema:?}"
-                                    );
-                                    Ok(schema)
-                                }
-                                _ => {
-                                    warn!(
-                                        "Ignoring invalid duration logical 
type for schema: {schema:?}"
-                                    );
-                                    Ok(schema)
-                                }
-                            }
-                        },
-                    );
-                }
-                // In this case, of an unknown logical type, we just pass 
through the underlying
-                // type.
-                _ => {}
-            },
-            // The spec says to ignore invalid logical types and just pass 
through the
-            // underlying type. It is unclear whether that applies to this 
case or not, where the
-            // `logicalType` is not a string.
-            Some(value) => return 
Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
-            _ => {}
-        }
-        match complex.get("type") {
-            Some(Value::String(t)) => match t.as_str() {
-                "record" => match parse_location {
-                    RecordSchemaParseLocation::Root => {
-                        self.parse_record(complex, enclosing_namespace)
-                    }
-                    RecordSchemaParseLocation::FromField => {
-                        self.fetch_schema_ref(t, enclosing_namespace)
-                    }
-                },
-                "enum" => self.parse_enum(complex, enclosing_namespace),
-                "array" => self.parse_array(complex, enclosing_namespace),
-                "map" => self.parse_map(complex, enclosing_namespace),
-                "fixed" => self.parse_fixed(complex, enclosing_namespace),
-                other => self.parse_known_schema(other, enclosing_namespace),
-            },
-            Some(Value::Object(data)) => {
-                self.parse_complex(data, enclosing_namespace, 
RecordSchemaParseLocation::Root)
-            }
-            Some(Value::Array(variants)) => self.parse_union(variants, 
enclosing_namespace),
-            Some(unknown) => 
Err(Details::GetComplexType(unknown.clone()).into()),
-            None => Err(Details::GetComplexTypeField.into()),
-        }
-    }
-
-    fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
-        let resolving_schema = Schema::Ref { name: name.clone() };
-        self.resolving_schemas
-            .insert(name.clone(), resolving_schema.clone());
-
-        let namespace = &name.namespace;
-
-        if let Some(aliases) = aliases {
-            aliases.iter().for_each(|alias| {
-                let alias_fullname = alias.fully_qualified_name(namespace);
-                self.resolving_schemas
-                    .insert(alias_fullname, resolving_schema.clone());
-            });
-        }
-    }
-
-    fn register_parsed_schema(
-        &mut self,
-        fully_qualified_name: &Name,
-        schema: &Schema,
-        aliases: &Aliases,
-    ) {
-        // FIXME, this should be globally aware, so if there is something 
overwriting something
-        // else then there is an ambiguous schema definition. An appropriate 
error should be thrown
-        self.parsed_schemas
-            .insert(fully_qualified_name.clone(), schema.clone());
-        self.resolving_schemas.remove(fully_qualified_name);
-
-        let namespace = &fully_qualified_name.namespace;
-
-        if let Some(aliases) = aliases {
-            aliases.iter().for_each(|alias| {
-                let alias_fullname = alias.fully_qualified_name(namespace);
-                self.resolving_schemas.remove(&alias_fullname);
-                self.parsed_schemas.insert(alias_fullname, schema.clone());
-            });
-        }
-    }
-
-    /// Returns already parsed schema or a schema that is currently being 
resolved.
-    fn get_already_seen_schema(
-        &self,
-        complex: &Map<String, Value>,
-        enclosing_namespace: &Namespace,
-    ) -> Option<&Schema> {
-        match complex.get("type") {
-            Some(Value::String(typ)) => {
-                let name = Name::new(typ.as_str())
-                    .unwrap()
-                    .fully_qualified_name(enclosing_namespace);
-                self.resolving_schemas
-                    .get(&name)
-                    .or_else(|| self.parsed_schemas.get(&name))
-            }
-            _ => None,
-        }
-    }
-
-    /// Parse a `serde_json::Value` representing a Avro record type into a
-    /// `Schema`.
-    fn parse_record(
-        &mut self,
-        complex: &Map<String, Value>,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        let fields_opt = complex.get("fields");
-
-        if fields_opt.is_none()
-            && let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace)
-        {
-            return Ok(seen.clone());
-        }
-
-        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
-        let aliases = fix_aliases_namespace(complex.aliases(), 
&fully_qualified_name.namespace);
-
-        let mut lookup = BTreeMap::new();
-
-        self.register_resolving_schema(&fully_qualified_name, &aliases);
-
-        debug!("Going to parse record schema: {:?}", &fully_qualified_name);
-
-        let fields: Vec<RecordField> = fields_opt
-            .and_then(|fields| fields.as_array())
-            .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
-            .and_then(|fields| {
-                fields
-                    .iter()
-                    .filter_map(|field| field.as_object())
-                    .enumerate()
-                    .map(|(position, field)| {
-                        RecordField::parse(field, position, self, 
&fully_qualified_name)
-                    })
-                    .collect::<Result<_, _>>()
-            })?;
-
-        for field in &fields {
-            if let Some(_old) = lookup.insert(field.name.clone(), 
field.position) {
-                return 
Err(Details::FieldNameDuplicate(field.name.clone()).into());
-            }
-
-            if let Some(ref field_aliases) = field.aliases {
-                for alias in field_aliases {
-                    lookup.insert(alias.clone(), field.position);
-                }
-            }
-        }
-
-        let schema = Schema::Record(RecordSchema {
-            name: fully_qualified_name.clone(),
-            aliases: aliases.clone(),
-            doc: complex.doc(),
-            fields,
-            lookup,
-            attributes: self.get_custom_attributes(complex, vec!["fields"]),
-        });
-
-        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
-        Ok(schema)
-    }
-
-    fn get_custom_attributes(
-        &self,
-        complex: &Map<String, Value>,
-        excluded: Vec<&'static str>,
-    ) -> BTreeMap<String, Value> {
-        let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
-        for (key, value) in complex {
-            match key.as_str() {
-                "type" | "name" | "namespace" | "doc" | "aliases" | 
"logicalType" => continue,
-                candidate if excluded.contains(&candidate) => continue,
-                _ => custom_attributes.insert(key.clone(), value.clone()),
-            };
-        }
-        custom_attributes
-    }
-
-    /// Parse a `serde_json::Value` representing a Avro enum type into a
-    /// `Schema`.
-    fn parse_enum(
-        &mut self,
-        complex: &Map<String, Value>,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        let symbols_opt = complex.get("symbols");
-
-        if symbols_opt.is_none()
-            && let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace)
-        {
-            return Ok(seen.clone());
-        }
-
-        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
-        let aliases = fix_aliases_namespace(complex.aliases(), 
&fully_qualified_name.namespace);
-
-        let symbols: Vec<String> = symbols_opt
-            .and_then(|v| v.as_array())
-            .ok_or_else(|| Error::from(Details::GetEnumSymbolsField))
-            .and_then(|symbols| {
-                symbols
-                    .iter()
-                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
-                    .collect::<Option<_>>()
-                    .ok_or_else(|| Error::from(Details::GetEnumSymbols))
-            })?;
-
-        let mut existing_symbols: HashSet<&String> = 
HashSet::with_capacity(symbols.len());
-        for symbol in symbols.iter() {
-            validate_enum_symbol_name(symbol)?;
-
-            // Ensure there are no duplicate symbols
-            if existing_symbols.contains(&symbol) {
-                return 
Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
-            }
-
-            existing_symbols.insert(symbol);
-        }
-
-        let mut default: Option<String> = None;
-        if let Some(value) = complex.get("default") {
-            if let Value::String(ref s) = *value {
-                default = Some(s.clone());
-            } else {
-                return 
Err(Details::EnumDefaultWrongType(value.clone()).into());
-            }
-        }
-
-        if let Some(ref value) = default {
-            let resolved = types::Value::from(value.clone())
-                .resolve_enum(&symbols, &Some(value.to_string()), &None)
-                .is_ok();
-            if !resolved {
-                return Err(Details::GetEnumDefault {
-                    symbol: value.to_string(),
-                    symbols,
-                }
-                .into());
-            }
-        }
-
-        let schema = Schema::Enum(EnumSchema {
-            name: fully_qualified_name.clone(),
-            aliases: aliases.clone(),
-            doc: complex.doc(),
-            symbols,
-            default,
-            attributes: self.get_custom_attributes(complex, vec!["symbols"]),
-        });
-
-        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
-
-        Ok(schema)
-    }
-
-    /// Parse a `serde_json::Value` representing a Avro array type into a
-    /// `Schema`.
-    fn parse_array(
-        &mut self,
-        complex: &Map<String, Value>,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        complex
-            .get("items")
-            .ok_or_else(|| Details::GetArrayItemsField.into())
-            .and_then(|items| self.parse(items, enclosing_namespace))
-            .map(|items| {
-                Schema::array_with_attributes(
-                    items,
-                    self.get_custom_attributes(complex, vec!["items"]),
-                )
-            })
-    }
-
-    /// Parse a `serde_json::Value` representing a Avro map type into a
-    /// `Schema`.
-    fn parse_map(
-        &mut self,
-        complex: &Map<String, Value>,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        complex
-            .get("values")
-            .ok_or_else(|| Details::GetMapValuesField.into())
-            .and_then(|items| self.parse(items, enclosing_namespace))
-            .map(|items| {
-                Schema::map_with_attributes(
-                    items,
-                    self.get_custom_attributes(complex, vec!["values"]),
-                )
-            })
-    }
-
-    /// Parse a `serde_json::Value` representing a Avro union type into a
-    /// `Schema`.
-    fn parse_union(
-        &mut self,
-        items: &[Value],
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        items
-            .iter()
-            .map(|v| self.parse(v, enclosing_namespace))
-            .collect::<Result<Vec<_>, _>>()
-            .and_then(|schemas| {
-                if schemas.is_empty() {
-                    error!(
-                        "Union schemas should have at least two members! \
-                    Please enable debug logging to find out which Record 
schema \
-                    declares the union with 
'RUST_LOG=apache_avro::schema=debug'."
-                    );
-                } else if schemas.len() == 1 {
-                    warn!(
-                        "Union schema with just one member! Consider dropping 
the union! \
-                    Please enable debug logging to find out which Record 
schema \
-                    declares the union with 
'RUST_LOG=apache_avro::schema=debug'."
-                    );
-                }
-                Ok(Schema::Union(UnionSchema::new(schemas)?))
-            })
-    }
-
-    /// Parse a `serde_json::Value` representing a Avro fixed type into a
-    /// `Schema`.
-    fn parse_fixed(
-        &mut self,
-        complex: &Map<String, Value>,
-        enclosing_namespace: &Namespace,
-    ) -> AvroResult<Schema> {
-        let size_opt = complex.get("size");
-        if size_opt.is_none()
-            && let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace)
-        {
-            return Ok(seen.clone());
-        }
-
-        let doc = complex.get("doc").and_then(|v| match &v {
-            &Value::String(docstr) => Some(docstr.clone()),
-            _ => None,
-        });
-
-        let size = match size_opt {
-            Some(size) => size
-                .as_u64()
-                .ok_or_else(|| 
Details::GetFixedSizeFieldPositive(size.clone())),
-            None => Err(Details::GetFixedSizeField),
-        }?;
-
-        let default = complex.get("default").and_then(|v| match &v {
-            &Value::String(default) => Some(default.clone()),
-            _ => None,
-        });
-
-        if default.is_some() {
-            let len = default.clone().unwrap().len();
-            if len != size as usize {
-                return Err(Details::FixedDefaultLenSizeMismatch(len, 
size).into());
-            }
-        }
-
-        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
-        let aliases = fix_aliases_namespace(complex.aliases(), 
&fully_qualified_name.namespace);
-
-        let schema = Schema::Fixed(FixedSchema {
-            name: fully_qualified_name.clone(),
-            aliases: aliases.clone(),
-            doc,
-            size: size as usize,
-            default,
-            attributes: self.get_custom_attributes(complex, vec!["size"]),
-        });
-
-        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
-
-        Ok(schema)
-    }
-}
-
-// A type alias may be specified either as a fully namespace-qualified, or 
relative
-// to the namespace of the name it is an alias for. For example, if a type 
named "a.b"
-// has aliases of "c" and "x.y", then the fully qualified names of its aliases 
are "a.c"
-// and "x.y".
-// https://avro.apache.org/docs/current/specification/#aliases
-fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) 
-> Aliases {
-    aliases.map(|aliases| {
-        aliases
-            .iter()
-            .map(|alias| {
-                if alias.find('.').is_none() {
-                    match namespace {
-                        Some(ns) => format!("{ns}.{alias}"),
-                        None => alias.clone(),
-                    }
-                } else {
-                    alias.clone()
-                }
-            })
-            .map(|alias| Alias::new(alias.as_str()).unwrap())
-            .collect()
-    })
-}
-
-fn get_schema_type_name(name: Name, value: Value) -> Name {
-    match value.get("type") {
-        Some(Value::Object(complex_type)) => match complex_type.name() {
-            Some(name) => Name::new(name.as_str()).unwrap(),
-            _ => name,
-        },
-        _ => name,
-    }
-}
-
 impl Serialize for Schema {
     fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
     where
diff --git a/avro/src/schema/parser.rs b/avro/src/schema/parser.rs
new file mode 100644
index 0000000..ced79bc
--- /dev/null
+++ b/avro/src/schema/parser.rs
@@ -0,0 +1,881 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Details;
+use crate::schema::record::RecordSchemaParseLocation;
+use crate::schema::{
+    Alias, Aliases, DecimalMetadata, DecimalSchema, EnumSchema, FixedSchema, 
Name, Names,
+    Namespace, Precision, RecordField, RecordSchema, Scale, Schema, 
SchemaKind, UnionSchema,
+    UuidSchema,
+};
+use crate::types;
+use crate::util::MapHelper;
+use crate::validator::validate_enum_symbol_name;
+use crate::{AvroResult, Error};
+use log::{debug, error, warn};
+use serde_json::{Map, Value};
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+#[derive(Default)]
+pub(crate) struct Parser {
+    input_schemas: HashMap<Name, Value>,
+    /// A map of name -> Schema::Ref
+    /// Used to resolve cyclic references, i.e. when a
+    /// field's type is a reference to its record's type
+    resolving_schemas: Names,
+    input_order: Vec<Name>,
+    /// A map of name -> fully parsed Schema
+    /// Used to avoid parsing the same schema twice
+    parsed_schemas: Names,
+}
+
+impl Parser {
+    pub(crate) fn new(
+        input_schemas: HashMap<Name, Value>,
+        input_order: Vec<Name>,
+        parsed_schemas: Names,
+    ) -> Self {
+        Self {
+            input_schemas,
+            resolving_schemas: HashMap::default(),
+            input_order,
+            parsed_schemas,
+        }
+    }
+
+    pub(crate) fn get_parsed_schemas(&mut self) -> &Names {
+        &self.parsed_schemas
+    }
+
+    /// Create a `Schema` from a string representing a JSON Avro schema.
+    pub(super) fn parse_str(&mut self, input: &str) -> AvroResult<Schema> {
+        let value = 
serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
+        self.parse(&value, &None)
+    }
+
+    /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. 
It is allowed that
+    /// the schemas have cross-dependencies; these will be resolved during 
parsing.
+    pub(super) fn parse_list(&mut self) -> AvroResult<Vec<Schema>> {
+        self.parse_input_schemas()?;
+
+        let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
+        for name in self.input_order.drain(0..) {
+            let parsed = self
+                .parsed_schemas
+                .remove(&name)
+                .expect("One of the input schemas was unexpectedly not 
parsed");
+            parsed_schemas.push(parsed);
+        }
+        Ok(parsed_schemas)
+    }
+
+    /// Convert the input schemas to parsed_schemas
+    pub(super) fn parse_input_schemas(&mut self) -> Result<(), Error> {
+        while !self.input_schemas.is_empty() {
+            let next_name = self
+                .input_schemas
+                .keys()
+                .next()
+                .expect("Input schemas unexpectedly empty")
+                .to_owned();
+            let (name, value) = self
+                .input_schemas
+                .remove_entry(&next_name)
+                .expect("Key unexpectedly missing");
+            let parsed = self.parse(&value, &None)?;
+            self.parsed_schemas
+                .insert(self.get_schema_type_name(name, value), parsed);
+        }
+        Ok(())
+    }
+
+    /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
+    /// schema.
+    pub(super) fn parse(
+        &mut self,
+        value: &Value,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        match *value {
+            Value::String(ref t) => self.parse_known_schema(t.as_str(), 
enclosing_namespace),
+            Value::Object(ref data) => {
+                self.parse_complex(data, enclosing_namespace, 
RecordSchemaParseLocation::Root)
+            }
+            Value::Array(ref data) => self.parse_union(data, 
enclosing_namespace),
+            _ => Err(Details::ParseSchemaFromValidJson.into()),
+        }
+    }
+
+    /// Parse a `serde_json::Value` representing an Avro type whose Schema is 
known into a
+    /// `Schema`. A Schema for a `serde_json::Value` is known if it is 
primitive or has
+    /// been parsed previously by the parsed and stored in its map of 
parsed_schemas.
+    fn parse_known_schema(
+        &mut self,
+        name: &str,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        match name {
+            "null" => Ok(Schema::Null),
+            "boolean" => Ok(Schema::Boolean),
+            "int" => Ok(Schema::Int),
+            "long" => Ok(Schema::Long),
+            "double" => Ok(Schema::Double),
+            "float" => Ok(Schema::Float),
+            "bytes" => Ok(Schema::Bytes),
+            "string" => Ok(Schema::String),
+            _ => self.fetch_schema_ref(name, enclosing_namespace),
+        }
+    }
+
+    /// Given a name, tries to retrieve the parsed schema from 
`parsed_schemas`.
+    /// If a parsed schema is not found, it checks if a currently resolving
+    /// schema with that name exists.
+    /// If a resolving schema is not found, it checks if a json with that name 
exists
+    /// in `input_schemas` and then parses it (removing it from 
`input_schemas`)
+    /// and adds the parsed schema to `parsed_schemas`.
+    ///
+    /// This method allows schemas definitions that depend on other types to
+    /// parse their dependencies (or look them up if already parsed).
+    pub(super) fn fetch_schema_ref(
+        &mut self,
+        name: &str,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        fn get_schema_ref(parsed: &Schema) -> Schema {
+            match parsed {
+                &Schema::Record(RecordSchema { ref name, .. })
+                | &Schema::Enum(EnumSchema { ref name, .. })
+                | &Schema::Fixed(FixedSchema { ref name, .. }) => {
+                    Schema::Ref { name: name.clone() }
+                }
+                _ => parsed.clone(),
+            }
+        }
+
+        let name = Name::new(name)?;
+        let fully_qualified_name = 
name.fully_qualified_name(enclosing_namespace);
+
+        if self.parsed_schemas.contains_key(&fully_qualified_name) {
+            return Ok(Schema::Ref {
+                name: fully_qualified_name,
+            });
+        }
+        if let Some(resolving_schema) = 
self.resolving_schemas.get(&fully_qualified_name) {
+            return Ok(resolving_schema.clone());
+        }
+
+        // For good error reporting we add this check
+        match name.name.as_str() {
+            "record" | "enum" | "fixed" => {
+                return 
Err(Details::InvalidSchemaRecord(name.to_string()).into());
+            }
+            _ => (),
+        }
+
+        let value = self
+            .input_schemas
+            .remove(&fully_qualified_name)
+            // TODO make a better descriptive error message here that conveys 
that a named schema cannot be found
+            .ok_or_else(|| 
Details::ParsePrimitive(fully_qualified_name.fullname(None)))?;
+
+        // parsing a full schema from inside another schema. Other full schema 
will not inherit namespace
+        let parsed = self.parse(&value, &None)?;
+        self.parsed_schemas
+            .insert(self.get_schema_type_name(name, value), parsed.clone());
+
+        Ok(get_schema_ref(&parsed))
+    }
+
+    fn get_decimal_integer(
+        &self,
+        complex: &Map<String, Value>,
+        key: &'static str,
+    ) -> AvroResult<DecimalMetadata> {
+        match complex.get(key) {
+            Some(Value::Number(value)) => 
self.parse_json_integer_for_decimal(value),
+            None => {
+                if key == "scale" {
+                    Ok(0)
+                } else {
+                    Err(Details::GetDecimalMetadataFromJson(key).into())
+                }
+            }
+            Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
+                key: key.into(),
+                value: value.clone(),
+            }
+            .into()),
+        }
+    }
+
+    fn parse_precision_and_scale(
+        &self,
+        complex: &Map<String, Value>,
+    ) -> AvroResult<(Precision, Scale)> {
+        let precision = self.get_decimal_integer(complex, "precision")?;
+        let scale = self.get_decimal_integer(complex, "scale")?;
+
+        if precision < 1 {
+            return Err(Details::DecimalPrecisionMuBePositive { precision 
}.into());
+        }
+
+        if precision < scale {
+            Err(Details::DecimalPrecisionLessThanScale { precision, scale 
}.into())
+        } else {
+            Ok((precision, scale))
+        }
+    }
+
+    /// Parse a `serde_json::Value` representing a complex Avro type into a
+    /// `Schema`.
+    ///
+    /// Avro supports "recursive" definition of types.
+    /// e.g: {"type": {"type": "string"}}
+    pub(super) fn parse_complex(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+        parse_location: RecordSchemaParseLocation,
+    ) -> AvroResult<Schema> {
+        // Try to parse this as a native complex type.
+        fn parse_as_native_complex(
+            complex: &Map<String, Value>,
+            parser: &mut Parser,
+            enclosing_namespace: &Namespace,
+        ) -> AvroResult<Schema> {
+            match complex.get("type") {
+                Some(value) => match value {
+                    Value::String(s) if s == "fixed" => {
+                        parser.parse_fixed(complex, enclosing_namespace)
+                    }
+                    _ => parser.parse(value, enclosing_namespace),
+                },
+                None => Err(Details::GetLogicalTypeField.into()),
+            }
+        }
+
+        // This crate support some logical types natively, and this function 
tries to convert
+        // a native complex type with a logical type attribute to these 
logical types.
+        // This function:
+        // 1. Checks whether the native complex type is in the supported kinds.
+        // 2. If it is, using the convert function to convert the native 
complex type to
+        // a logical type.
+        fn try_convert_to_logical_type<F>(
+            logical_type: &str,
+            schema: Schema,
+            supported_schema_kinds: &[SchemaKind],
+            convert: F,
+        ) -> AvroResult<Schema>
+        where
+            F: Fn(Schema) -> AvroResult<Schema>,
+        {
+            let kind = SchemaKind::from(schema.clone());
+            if supported_schema_kinds.contains(&kind) {
+                convert(schema)
+            } else {
+                warn!(
+                    "Ignoring unknown logical type '{logical_type}' for schema 
of type: {schema:?}!"
+                );
+                Ok(schema)
+            }
+        }
+
+        match complex.get("logicalType") {
+            Some(Value::String(t)) => match t.as_str() {
+                "decimal" => {
+                    return try_convert_to_logical_type(
+                        "decimal",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Fixed, SchemaKind::Bytes],
+                        |inner| -> AvroResult<Schema> {
+                            match self.parse_precision_and_scale(complex) {
+                                Ok((precision, scale)) => 
Ok(Schema::Decimal(DecimalSchema {
+                                    precision,
+                                    scale,
+                                    inner: inner.try_into()?,
+                                })),
+                                Err(err) => {
+                                    warn!("Ignoring invalid decimal logical 
type: {err}");
+                                    Ok(inner)
+                                }
+                            }
+                        },
+                    );
+                }
+                "big-decimal" => {
+                    return try_convert_to_logical_type(
+                        "big-decimal",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Bytes],
+                        |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
+                    );
+                }
+                "uuid" => {
+                    return try_convert_to_logical_type(
+                        "uuid",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::String, SchemaKind::Fixed, 
SchemaKind::Bytes],
+                        |schema| match schema {
+                            Schema::String => 
Ok(Schema::Uuid(UuidSchema::String)),
+                            Schema::Fixed(fixed @ FixedSchema { size: 16, .. 
}) => {
+                                Ok(Schema::Uuid(UuidSchema::Fixed(fixed)))
+                            }
+                            Schema::Fixed(FixedSchema { size, .. }) => {
+                                warn!(
+                                    "Ignoring uuid logical type for a Fixed 
schema because its size ({size:?}) is not 16! Schema: {schema:?}"
+                                );
+                                Ok(schema)
+                            }
+                            Schema::Bytes => 
Ok(Schema::Uuid(UuidSchema::Bytes)),
+                            _ => {
+                                warn!("Ignoring invalid uuid logical type for 
schema: {schema:?}");
+                                Ok(schema)
+                            }
+                        },
+                    );
+                }
+                "date" => {
+                    return try_convert_to_logical_type(
+                        "date",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Int],
+                        |_| -> AvroResult<Schema> { Ok(Schema::Date) },
+                    );
+                }
+                "time-millis" => {
+                    return try_convert_to_logical_type(
+                        "date",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Int],
+                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
+                    );
+                }
+                "time-micros" => {
+                    return try_convert_to_logical_type(
+                        "time-micros",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Long],
+                        |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
+                    );
+                }
+                "timestamp-millis" => {
+                    return try_convert_to_logical_type(
+                        "timestamp-millis",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Long],
+                        |_| -> AvroResult<Schema> { 
Ok(Schema::TimestampMillis) },
+                    );
+                }
+                "timestamp-micros" => {
+                    return try_convert_to_logical_type(
+                        "timestamp-micros",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Long],
+                        |_| -> AvroResult<Schema> { 
Ok(Schema::TimestampMicros) },
+                    );
+                }
+                "timestamp-nanos" => {
+                    return try_convert_to_logical_type(
+                        "timestamp-nanos",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Long],
+                        |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) 
},
+                    );
+                }
+                "local-timestamp-millis" => {
+                    return try_convert_to_logical_type(
+                        "local-timestamp-millis",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Long],
+                        |_| -> AvroResult<Schema> { 
Ok(Schema::LocalTimestampMillis) },
+                    );
+                }
+                "local-timestamp-micros" => {
+                    return try_convert_to_logical_type(
+                        "local-timestamp-micros",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Long],
+                        |_| -> AvroResult<Schema> { 
Ok(Schema::LocalTimestampMicros) },
+                    );
+                }
+                "local-timestamp-nanos" => {
+                    return try_convert_to_logical_type(
+                        "local-timestamp-nanos",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Long],
+                        |_| -> AvroResult<Schema> { 
Ok(Schema::LocalTimestampNanos) },
+                    );
+                }
+                "duration" => {
+                    return try_convert_to_logical_type(
+                        "duration",
+                        parse_as_native_complex(complex, self, 
enclosing_namespace)?,
+                        &[SchemaKind::Fixed],
+                        |schema| -> AvroResult<Schema> {
+                            match schema {
+                                Schema::Fixed(fixed @ FixedSchema { size: 12, 
.. }) => {
+                                    Ok(Schema::Duration(fixed))
+                                }
+                                Schema::Fixed(FixedSchema { size, .. }) => {
+                                    warn!(
+                                        "Ignoring duration logical type on 
fixed type because size ({size}) is not 12! Schema: {schema:?}"
+                                    );
+                                    Ok(schema)
+                                }
+                                _ => {
+                                    warn!(
+                                        "Ignoring invalid duration logical 
type for schema: {schema:?}"
+                                    );
+                                    Ok(schema)
+                                }
+                            }
+                        },
+                    );
+                }
+                // In this case, of an unknown logical type, we just pass 
through the underlying
+                // type.
+                _ => {}
+            },
+            // The spec says to ignore invalid logical types and just pass 
through the
+            // underlying type. It is unclear whether that applies to this 
case or not, where the
+            // `logicalType` is not a string.
+            Some(value) => return 
Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
+            _ => {}
+        }
+        match complex.get("type") {
+            Some(Value::String(t)) => match t.as_str() {
+                "record" => match parse_location {
+                    RecordSchemaParseLocation::Root => {
+                        self.parse_record(complex, enclosing_namespace)
+                    }
+                    RecordSchemaParseLocation::FromField => {
+                        self.fetch_schema_ref(t, enclosing_namespace)
+                    }
+                },
+                "enum" => self.parse_enum(complex, enclosing_namespace),
+                "array" => self.parse_array(complex, enclosing_namespace),
+                "map" => self.parse_map(complex, enclosing_namespace),
+                "fixed" => self.parse_fixed(complex, enclosing_namespace),
+                other => self.parse_known_schema(other, enclosing_namespace),
+            },
+            Some(Value::Object(data)) => {
+                self.parse_complex(data, enclosing_namespace, 
RecordSchemaParseLocation::Root)
+            }
+            Some(Value::Array(variants)) => self.parse_union(variants, 
enclosing_namespace),
+            Some(unknown) => 
Err(Details::GetComplexType(unknown.clone()).into()),
+            None => Err(Details::GetComplexTypeField.into()),
+        }
+    }
+
+    fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
+        let resolving_schema = Schema::Ref { name: name.clone() };
+        self.resolving_schemas
+            .insert(name.clone(), resolving_schema.clone());
+
+        let namespace = &name.namespace;
+
+        if let Some(aliases) = aliases {
+            aliases.iter().for_each(|alias| {
+                let alias_fullname = alias.fully_qualified_name(namespace);
+                self.resolving_schemas
+                    .insert(alias_fullname, resolving_schema.clone());
+            });
+        }
+    }
+
+    fn register_parsed_schema(
+        &mut self,
+        fully_qualified_name: &Name,
+        schema: &Schema,
+        aliases: &Aliases,
+    ) {
+        // FIXME, this should be globally aware, so if there is something 
overwriting something
+        // else then there is an ambiguous schema definition. An appropriate 
error should be thrown
+        self.parsed_schemas
+            .insert(fully_qualified_name.clone(), schema.clone());
+        self.resolving_schemas.remove(fully_qualified_name);
+
+        let namespace = &fully_qualified_name.namespace;
+
+        if let Some(aliases) = aliases {
+            aliases.iter().for_each(|alias| {
+                let alias_fullname = alias.fully_qualified_name(namespace);
+                self.resolving_schemas.remove(&alias_fullname);
+                self.parsed_schemas.insert(alias_fullname, schema.clone());
+            });
+        }
+    }
+
+    /// Returns already parsed schema or a schema that is currently being 
resolved.
+    fn get_already_seen_schema(
+        &self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> Option<&Schema> {
+        match complex.get("type") {
+            Some(Value::String(typ)) => {
+                let name = Name::new(typ.as_str())
+                    .unwrap()
+                    .fully_qualified_name(enclosing_namespace);
+                self.resolving_schemas
+                    .get(&name)
+                    .or_else(|| self.parsed_schemas.get(&name))
+            }
+            _ => None,
+        }
+    }
+
+    /// Parse a `serde_json::Value` representing a Avro record type into a
+    /// `Schema`.
+    fn parse_record(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        let fields_opt = complex.get("fields");
+
+        if fields_opt.is_none()
+            && let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace)
+        {
+            return Ok(seen.clone());
+        }
+
+        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
+        let aliases =
+            self.fix_aliases_namespace(complex.aliases(), 
&fully_qualified_name.namespace);
+
+        let mut lookup = BTreeMap::new();
+
+        self.register_resolving_schema(&fully_qualified_name, &aliases);
+
+        debug!("Going to parse record schema: {:?}", &fully_qualified_name);
+
+        let fields: Vec<RecordField> = fields_opt
+            .and_then(|fields| fields.as_array())
+            .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
+            .and_then(|fields| {
+                fields
+                    .iter()
+                    .filter_map(|field| field.as_object())
+                    .enumerate()
+                    .map(|(position, field)| {
+                        RecordField::parse(field, position, self, 
&fully_qualified_name)
+                    })
+                    .collect::<Result<_, _>>()
+            })?;
+
+        for field in &fields {
+            if let Some(_old) = lookup.insert(field.name.clone(), 
field.position) {
+                return 
Err(Details::FieldNameDuplicate(field.name.clone()).into());
+            }
+
+            if let Some(ref field_aliases) = field.aliases {
+                for alias in field_aliases {
+                    lookup.insert(alias.clone(), field.position);
+                }
+            }
+        }
+
+        let schema = Schema::Record(RecordSchema {
+            name: fully_qualified_name.clone(),
+            aliases: aliases.clone(),
+            doc: complex.doc(),
+            fields,
+            lookup,
+            attributes: self.get_custom_attributes(complex, vec!["fields"]),
+        });
+
+        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
+        Ok(schema)
+    }
+
+    fn get_custom_attributes(
+        &self,
+        complex: &Map<String, Value>,
+        excluded: Vec<&'static str>,
+    ) -> BTreeMap<String, Value> {
+        let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
+        for (key, value) in complex {
+            match key.as_str() {
+                "type" | "name" | "namespace" | "doc" | "aliases" | 
"logicalType" => continue,
+                candidate if excluded.contains(&candidate) => continue,
+                _ => custom_attributes.insert(key.clone(), value.clone()),
+            };
+        }
+        custom_attributes
+    }
+
+    /// Parse a `serde_json::Value` representing a Avro enum type into a
+    /// `Schema`.
+    fn parse_enum(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        let symbols_opt = complex.get("symbols");
+
+        if symbols_opt.is_none()
+            && let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace)
+        {
+            return Ok(seen.clone());
+        }
+
+        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
+        let aliases =
+            self.fix_aliases_namespace(complex.aliases(), 
&fully_qualified_name.namespace);
+
+        let symbols: Vec<String> = symbols_opt
+            .and_then(|v| v.as_array())
+            .ok_or_else(|| Error::from(Details::GetEnumSymbolsField))
+            .and_then(|symbols| {
+                symbols
+                    .iter()
+                    .map(|symbol| symbol.as_str().map(|s| s.to_string()))
+                    .collect::<Option<_>>()
+                    .ok_or_else(|| Error::from(Details::GetEnumSymbols))
+            })?;
+
+        let mut existing_symbols: HashSet<&String> = 
HashSet::with_capacity(symbols.len());
+        for symbol in symbols.iter() {
+            validate_enum_symbol_name(symbol)?;
+
+            // Ensure there are no duplicate symbols
+            if existing_symbols.contains(&symbol) {
+                return 
Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
+            }
+
+            existing_symbols.insert(symbol);
+        }
+
+        let mut default: Option<String> = None;
+        if let Some(value) = complex.get("default") {
+            if let Value::String(ref s) = *value {
+                default = Some(s.clone());
+            } else {
+                return 
Err(Details::EnumDefaultWrongType(value.clone()).into());
+            }
+        }
+
+        if let Some(ref value) = default {
+            let resolved = types::Value::from(value.clone())
+                .resolve_enum(&symbols, &Some(value.to_string()), &None)
+                .is_ok();
+            if !resolved {
+                return Err(Details::GetEnumDefault {
+                    symbol: value.to_string(),
+                    symbols,
+                }
+                .into());
+            }
+        }
+
+        let schema = Schema::Enum(EnumSchema {
+            name: fully_qualified_name.clone(),
+            aliases: aliases.clone(),
+            doc: complex.doc(),
+            symbols,
+            default,
+            attributes: self.get_custom_attributes(complex, vec!["symbols"]),
+        });
+
+        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
+
+        Ok(schema)
+    }
+
+    /// Parse a `serde_json::Value` representing a Avro array type into a
+    /// `Schema`.
+    fn parse_array(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        complex
+            .get("items")
+            .ok_or_else(|| Details::GetArrayItemsField.into())
+            .and_then(|items| self.parse(items, enclosing_namespace))
+            .map(|items| {
+                Schema::array_with_attributes(
+                    items,
+                    self.get_custom_attributes(complex, vec!["items"]),
+                )
+            })
+    }
+
+    /// Parse a `serde_json::Value` representing a Avro map type into a
+    /// `Schema`.
+    fn parse_map(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        complex
+            .get("values")
+            .ok_or_else(|| Details::GetMapValuesField.into())
+            .and_then(|items| self.parse(items, enclosing_namespace))
+            .map(|items| {
+                Schema::map_with_attributes(
+                    items,
+                    self.get_custom_attributes(complex, vec!["values"]),
+                )
+            })
+    }
+
+    /// Parse a `serde_json::Value` representing a Avro union type into a
+    /// `Schema`.
+    fn parse_union(
+        &mut self,
+        items: &[Value],
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        items
+            .iter()
+            .map(|v| self.parse(v, enclosing_namespace))
+            .collect::<Result<Vec<_>, _>>()
+            .and_then(|schemas| {
+                if schemas.is_empty() {
+                    error!(
+                        "Union schemas should have at least two members! \
+                    Please enable debug logging to find out which Record 
schema \
+                    declares the union with 
'RUST_LOG=apache_avro::schema=debug'."
+                    );
+                } else if schemas.len() == 1 {
+                    warn!(
+                        "Union schema with just one member! Consider dropping 
the union! \
+                    Please enable debug logging to find out which Record 
schema \
+                    declares the union with 
'RUST_LOG=apache_avro::schema=debug'."
+                    );
+                }
+                Ok(Schema::Union(UnionSchema::new(schemas)?))
+            })
+    }
+
+    /// Parse a `serde_json::Value` representing a Avro fixed type into a
+    /// `Schema`.
+    fn parse_fixed(
+        &mut self,
+        complex: &Map<String, Value>,
+        enclosing_namespace: &Namespace,
+    ) -> AvroResult<Schema> {
+        let size_opt = complex.get("size");
+        if size_opt.is_none()
+            && let Some(seen) = self.get_already_seen_schema(complex, 
enclosing_namespace)
+        {
+            return Ok(seen.clone());
+        }
+
+        let doc = complex.get("doc").and_then(|v| match &v {
+            &Value::String(docstr) => Some(docstr.clone()),
+            _ => None,
+        });
+
+        let size = match size_opt {
+            Some(size) => size
+                .as_u64()
+                .ok_or_else(|| 
Details::GetFixedSizeFieldPositive(size.clone())),
+            None => Err(Details::GetFixedSizeField),
+        }?;
+
+        let default = complex.get("default").and_then(|v| match &v {
+            &Value::String(default) => Some(default.clone()),
+            _ => None,
+        });
+
+        if default.is_some() {
+            let len = default.clone().unwrap().len();
+            if len != size as usize {
+                return Err(Details::FixedDefaultLenSizeMismatch(len, 
size).into());
+            }
+        }
+
+        let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
+        let aliases =
+            self.fix_aliases_namespace(complex.aliases(), 
&fully_qualified_name.namespace);
+
+        let schema = Schema::Fixed(FixedSchema {
+            name: fully_qualified_name.clone(),
+            aliases: aliases.clone(),
+            doc,
+            size: size as usize,
+            default,
+            attributes: self.get_custom_attributes(complex, vec!["size"]),
+        });
+
+        self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
+
+        Ok(schema)
+    }
+
+    // A type alias may be specified either as a fully namespace-qualified, or 
relative
+    // to the namespace of the name it is an alias for. For example, if a type 
named "a.b"
+    // has aliases of "c" and "x.y", then the fully qualified names of its 
aliases are "a.c"
+    // and "x.y".
+    // https://avro.apache.org/docs/current/specification/#aliases
+    fn fix_aliases_namespace(
+        &self,
+        aliases: Option<Vec<String>>,
+        namespace: &Namespace,
+    ) -> Aliases {
+        aliases.map(|aliases| {
+            aliases
+                .iter()
+                .map(|alias| {
+                    if alias.find('.').is_none() {
+                        match namespace {
+                            Some(ns) => format!("{ns}.{alias}"),
+                            None => alias.clone(),
+                        }
+                    } else {
+                        alias.clone()
+                    }
+                })
+                .map(|alias| Alias::new(alias.as_str()).unwrap())
+                .collect()
+        })
+    }
+
+    fn get_schema_type_name(&self, name: Name, value: Value) -> Name {
+        match value.get("type") {
+            Some(Value::Object(complex_type)) => match complex_type.name() {
+                Some(name) => Name::new(name.as_str()).unwrap(),
+                _ => name,
+            },
+            _ => name,
+        }
+    }
+
+    fn parse_json_integer_for_decimal(
+        &self,
+        value: &serde_json::Number,
+    ) -> AvroResult<DecimalMetadata> {
+        Ok(if value.is_u64() {
+            let num = value
+                .as_u64()
+                .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
+            num.try_into()
+                .map_err(|e| Details::ConvertU64ToUsize(e, num))?
+        } else if value.is_i64() {
+            let num = value
+                .as_i64()
+                .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
+            num.try_into()
+                .map_err(|e| Details::ConvertI64ToUsize(e, num))?
+        } else {
+            return 
Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
+        })
+    }
+}
diff --git a/avro/src/schema/record/field.rs b/avro/src/schema/record/field.rs
index 365190c..9a42310 100644
--- a/avro/src/schema/record/field.rs
+++ b/avro/src/schema/record/field.rs
@@ -92,7 +92,7 @@ impl RecordField {
             &schema,
             &name,
             &enclosing_record.fullname(None),
-            &parser.parsed_schemas,
+            parser.get_parsed_schemas(),
             &default,
         )?;
 

Reply via email to