This is an automated email from the ASF dual-hosted git repository. mgrigorov pushed a commit to branch extract-schema-parser-to-its-own-module in repository https://gitbox.apache.org/repos/asf/avro-rs.git
commit 971bac9e0871c48205ec17f1df5f6f963ea9a3cc Author: Martin Tzvetanov Grigorov <[email protected]> AuthorDate: Tue Jan 20 09:47:01 2026 +0200 chore: Extract Parser to its own module (src/schema/parser.rs) Minor internal API changes to fix the build. No API breaks! --- avro/src/schema/mod.rs | 842 +------------------------------------- avro/src/schema/parser.rs | 881 ++++++++++++++++++++++++++++++++++++++++ avro/src/schema/record/field.rs | 2 +- 3 files changed, 892 insertions(+), 833 deletions(-) diff --git a/avro/src/schema/mod.rs b/avro/src/schema/mod.rs index 42a0b80..d915f46 100644 --- a/avro/src/schema/mod.rs +++ b/avro/src/schema/mod.rs @@ -21,10 +21,9 @@ use crate::{ error::{Details, Error}, schema_equality, types, util::MapHelper, - validator::{validate_enum_symbol_name, validate_namespace, validate_schema_name}, + validator::{validate_namespace, validate_schema_name}, }; use digest::Digest; -use log::{debug, error, warn}; use serde::{ Deserialize, Serialize, Serializer, ser::{SerializeMap, SerializeSeq}, @@ -47,6 +46,8 @@ pub use record::{ }; mod union; pub use union::UnionSchema; +mod parser; +use parser::Parser; /// Represents an Avro schema fingerprint /// More information about Avro schema fingerprints can be found in the @@ -780,37 +781,6 @@ type DecimalMetadata = usize; pub(crate) type Precision = DecimalMetadata; pub(crate) type Scale = DecimalMetadata; -fn parse_json_integer_for_decimal(value: &serde_json::Number) -> Result<DecimalMetadata, Error> { - Ok(if value.is_u64() { - let num = value - .as_u64() - .ok_or_else(|| Details::GetU64FromJson(value.clone()))?; - num.try_into() - .map_err(|e| Details::ConvertU64ToUsize(e, num))? - } else if value.is_i64() { - let num = value - .as_i64() - .ok_or_else(|| Details::GetI64FromJson(value.clone()))?; - num.try_into() - .map_err(|e| Details::ConvertI64ToUsize(e, num))? - } else { - return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into()); - }) -} - -#[derive(Default)] -pub(crate) struct Parser { - input_schemas: HashMap<Name, Value>, - /// A map of name -> Schema::Ref - /// Used to resolve cyclic references, i.e. when a - /// field's type is a reference to its record's type - resolving_schemas: Names, - input_order: Vec<Name>, - /// A map of name -> fully parsed Schema - /// Used to avoid parsing the same schema twice - parsed_schemas: Names, -} - impl Schema { /// Converts `self` into its [Parsing Canonical Form]. /// @@ -880,12 +850,11 @@ impl Schema { return Err(Details::GetNameField.into()); } } - let mut parser = Parser { + let mut parser = Parser::new( input_schemas, - resolving_schemas: HashMap::default(), input_order, - parsed_schemas: HashMap::with_capacity(input_len), - }; + HashMap::with_capacity(input_len), + ); parser.parse_list() } @@ -922,12 +891,11 @@ impl Schema { return Err(Details::GetNameField.into()); } } - let mut parser = Parser { + let mut parser = Parser::new( input_schemas, - resolving_schemas: HashMap::default(), input_order, - parsed_schemas: HashMap::with_capacity(schemata_len), - }; + HashMap::with_capacity(schemata_len), + ); parser.parse_input_schemas()?; let value = serde_json::from_str(schema).map_err(Details::ParseSchemaJson)?; @@ -954,12 +922,7 @@ impl Schema { /// Parses an Avro schema from JSON. /// Any `Schema::Ref`s must be known in the `names` map. pub(crate) fn parse_with_names(value: &Value, names: Names) -> AvroResult<Schema> { - let mut parser = Parser { - input_schemas: HashMap::with_capacity(1), - resolving_schemas: Names::default(), - input_order: Vec::with_capacity(1), - parsed_schemas: names, - }; + let mut parser = Parser::new(HashMap::with_capacity(1), Vec::with_capacity(1), names); parser.parse(value, &None) } @@ -1129,791 +1092,6 @@ impl Schema { } } -impl Parser { - /// Create a `Schema` from a string representing a JSON Avro schema. - fn parse_str(&mut self, input: &str) -> Result<Schema, Error> { - let value = serde_json::from_str(input).map_err(Details::ParseSchemaJson)?; - self.parse(&value, &None) - } - - /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. It is allowed that - /// the schemas have cross-dependencies; these will be resolved during parsing. - fn parse_list(&mut self) -> Result<Vec<Schema>, Error> { - self.parse_input_schemas()?; - - let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len()); - for name in self.input_order.drain(0..) { - let parsed = self - .parsed_schemas - .remove(&name) - .expect("One of the input schemas was unexpectedly not parsed"); - parsed_schemas.push(parsed); - } - Ok(parsed_schemas) - } - - /// Convert the input schemas to parsed_schemas - fn parse_input_schemas(&mut self) -> Result<(), Error> { - while !self.input_schemas.is_empty() { - let next_name = self - .input_schemas - .keys() - .next() - .expect("Input schemas unexpectedly empty") - .to_owned(); - let (name, value) = self - .input_schemas - .remove_entry(&next_name) - .expect("Key unexpectedly missing"); - let parsed = self.parse(&value, &None)?; - self.parsed_schemas - .insert(get_schema_type_name(name, value), parsed); - } - Ok(()) - } - - /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro - /// schema. - fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) -> AvroResult<Schema> { - match *value { - Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace), - Value::Object(ref data) => { - self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root) - } - Value::Array(ref data) => self.parse_union(data, enclosing_namespace), - _ => Err(Details::ParseSchemaFromValidJson.into()), - } - } - - /// Parse a `serde_json::Value` representing an Avro type whose Schema is known into a - /// `Schema`. A Schema for a `serde_json::Value` is known if it is primitive or has - /// been parsed previously by the parsed and stored in its map of parsed_schemas. - fn parse_known_schema( - &mut self, - name: &str, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - match name { - "null" => Ok(Schema::Null), - "boolean" => Ok(Schema::Boolean), - "int" => Ok(Schema::Int), - "long" => Ok(Schema::Long), - "double" => Ok(Schema::Double), - "float" => Ok(Schema::Float), - "bytes" => Ok(Schema::Bytes), - "string" => Ok(Schema::String), - _ => self.fetch_schema_ref(name, enclosing_namespace), - } - } - - /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`. - /// If a parsed schema is not found, it checks if a currently resolving - /// schema with that name exists. - /// If a resolving schema is not found, it checks if a json with that name exists - /// in `input_schemas` and then parses it (removing it from `input_schemas`) - /// and adds the parsed schema to `parsed_schemas`. - /// - /// This method allows schemas definitions that depend on other types to - /// parse their dependencies (or look them up if already parsed). - fn fetch_schema_ref( - &mut self, - name: &str, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - fn get_schema_ref(parsed: &Schema) -> Schema { - match parsed { - &Schema::Record(RecordSchema { ref name, .. }) - | &Schema::Enum(EnumSchema { ref name, .. }) - | &Schema::Fixed(FixedSchema { ref name, .. }) => { - Schema::Ref { name: name.clone() } - } - _ => parsed.clone(), - } - } - - let name = Name::new(name)?; - let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); - - if self.parsed_schemas.contains_key(&fully_qualified_name) { - return Ok(Schema::Ref { - name: fully_qualified_name, - }); - } - if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) { - return Ok(resolving_schema.clone()); - } - - // For good error reporting we add this check - match name.name.as_str() { - "record" | "enum" | "fixed" => { - return Err(Details::InvalidSchemaRecord(name.to_string()).into()); - } - _ => (), - } - - let value = self - .input_schemas - .remove(&fully_qualified_name) - // TODO make a better descriptive error message here that conveys that a named schema cannot be found - .ok_or_else(|| Details::ParsePrimitive(fully_qualified_name.fullname(None)))?; - - // parsing a full schema from inside another schema. Other full schema will not inherit namespace - let parsed = self.parse(&value, &None)?; - self.parsed_schemas - .insert(get_schema_type_name(name, value), parsed.clone()); - - Ok(get_schema_ref(&parsed)) - } - - fn parse_precision_and_scale( - complex: &Map<String, Value>, - ) -> Result<(Precision, Scale), Error> { - fn get_decimal_integer( - complex: &Map<String, Value>, - key: &'static str, - ) -> Result<DecimalMetadata, Error> { - match complex.get(key) { - Some(Value::Number(value)) => parse_json_integer_for_decimal(value), - None => { - if key == "scale" { - Ok(0) - } else { - Err(Details::GetDecimalMetadataFromJson(key).into()) - } - } - Some(value) => Err(Details::GetDecimalMetadataValueFromJson { - key: key.into(), - value: value.clone(), - } - .into()), - } - } - let precision = get_decimal_integer(complex, "precision")?; - let scale = get_decimal_integer(complex, "scale")?; - - if precision < 1 { - return Err(Details::DecimalPrecisionMuBePositive { precision }.into()); - } - - if precision < scale { - Err(Details::DecimalPrecisionLessThanScale { precision, scale }.into()) - } else { - Ok((precision, scale)) - } - } - - /// Parse a `serde_json::Value` representing a complex Avro type into a - /// `Schema`. - /// - /// Avro supports "recursive" definition of types. - /// e.g: {"type": {"type": "string"}} - fn parse_complex( - &mut self, - complex: &Map<String, Value>, - enclosing_namespace: &Namespace, - parse_location: RecordSchemaParseLocation, - ) -> AvroResult<Schema> { - // Try to parse this as a native complex type. - fn parse_as_native_complex( - complex: &Map<String, Value>, - parser: &mut Parser, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - match complex.get("type") { - Some(value) => match value { - Value::String(s) if s == "fixed" => { - parser.parse_fixed(complex, enclosing_namespace) - } - _ => parser.parse(value, enclosing_namespace), - }, - None => Err(Details::GetLogicalTypeField.into()), - } - } - - // This crate support some logical types natively, and this function tries to convert - // a native complex type with a logical type attribute to these logical types. - // This function: - // 1. Checks whether the native complex type is in the supported kinds. - // 2. If it is, using the convert function to convert the native complex type to - // a logical type. - fn try_convert_to_logical_type<F>( - logical_type: &str, - schema: Schema, - supported_schema_kinds: &[SchemaKind], - convert: F, - ) -> AvroResult<Schema> - where - F: Fn(Schema) -> AvroResult<Schema>, - { - let kind = SchemaKind::from(schema.clone()); - if supported_schema_kinds.contains(&kind) { - convert(schema) - } else { - warn!( - "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!" - ); - Ok(schema) - } - } - - match complex.get("logicalType") { - Some(Value::String(t)) => match t.as_str() { - "decimal" => { - return try_convert_to_logical_type( - "decimal", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Fixed, SchemaKind::Bytes], - |inner| -> AvroResult<Schema> { - match Self::parse_precision_and_scale(complex) { - Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema { - precision, - scale, - inner: inner.try_into()?, - })), - Err(err) => { - warn!("Ignoring invalid decimal logical type: {err}"); - Ok(inner) - } - } - }, - ); - } - "big-decimal" => { - return try_convert_to_logical_type( - "big-decimal", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Bytes], - |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) }, - ); - } - "uuid" => { - return try_convert_to_logical_type( - "uuid", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::String, SchemaKind::Fixed, SchemaKind::Bytes], - |schema| match schema { - Schema::String => Ok(Schema::Uuid(UuidSchema::String)), - Schema::Fixed(fixed @ FixedSchema { size: 16, .. }) => { - Ok(Schema::Uuid(UuidSchema::Fixed(fixed))) - } - Schema::Fixed(FixedSchema { size, .. }) => { - warn!( - "Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}" - ); - Ok(schema) - } - Schema::Bytes => Ok(Schema::Uuid(UuidSchema::Bytes)), - _ => { - warn!("Ignoring invalid uuid logical type for schema: {schema:?}"); - Ok(schema) - } - }, - ); - } - "date" => { - return try_convert_to_logical_type( - "date", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Int], - |_| -> AvroResult<Schema> { Ok(Schema::Date) }, - ); - } - "time-millis" => { - return try_convert_to_logical_type( - "date", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Int], - |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) }, - ); - } - "time-micros" => { - return try_convert_to_logical_type( - "time-micros", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Long], - |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) }, - ); - } - "timestamp-millis" => { - return try_convert_to_logical_type( - "timestamp-millis", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Long], - |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) }, - ); - } - "timestamp-micros" => { - return try_convert_to_logical_type( - "timestamp-micros", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Long], - |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) }, - ); - } - "timestamp-nanos" => { - return try_convert_to_logical_type( - "timestamp-nanos", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Long], - |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) }, - ); - } - "local-timestamp-millis" => { - return try_convert_to_logical_type( - "local-timestamp-millis", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Long], - |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) }, - ); - } - "local-timestamp-micros" => { - return try_convert_to_logical_type( - "local-timestamp-micros", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Long], - |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) }, - ); - } - "local-timestamp-nanos" => { - return try_convert_to_logical_type( - "local-timestamp-nanos", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Long], - |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) }, - ); - } - "duration" => { - return try_convert_to_logical_type( - "duration", - parse_as_native_complex(complex, self, enclosing_namespace)?, - &[SchemaKind::Fixed], - |schema| -> AvroResult<Schema> { - match schema { - Schema::Fixed(fixed @ FixedSchema { size: 12, .. }) => { - Ok(Schema::Duration(fixed)) - } - Schema::Fixed(FixedSchema { size, .. }) => { - warn!( - "Ignoring duration logical type on fixed type because size ({size}) is not 12! Schema: {schema:?}" - ); - Ok(schema) - } - _ => { - warn!( - "Ignoring invalid duration logical type for schema: {schema:?}" - ); - Ok(schema) - } - } - }, - ); - } - // In this case, of an unknown logical type, we just pass through the underlying - // type. - _ => {} - }, - // The spec says to ignore invalid logical types and just pass through the - // underlying type. It is unclear whether that applies to this case or not, where the - // `logicalType` is not a string. - Some(value) => return Err(Details::GetLogicalTypeFieldType(value.clone()).into()), - _ => {} - } - match complex.get("type") { - Some(Value::String(t)) => match t.as_str() { - "record" => match parse_location { - RecordSchemaParseLocation::Root => { - self.parse_record(complex, enclosing_namespace) - } - RecordSchemaParseLocation::FromField => { - self.fetch_schema_ref(t, enclosing_namespace) - } - }, - "enum" => self.parse_enum(complex, enclosing_namespace), - "array" => self.parse_array(complex, enclosing_namespace), - "map" => self.parse_map(complex, enclosing_namespace), - "fixed" => self.parse_fixed(complex, enclosing_namespace), - other => self.parse_known_schema(other, enclosing_namespace), - }, - Some(Value::Object(data)) => { - self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root) - } - Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace), - Some(unknown) => Err(Details::GetComplexType(unknown.clone()).into()), - None => Err(Details::GetComplexTypeField.into()), - } - } - - fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) { - let resolving_schema = Schema::Ref { name: name.clone() }; - self.resolving_schemas - .insert(name.clone(), resolving_schema.clone()); - - let namespace = &name.namespace; - - if let Some(aliases) = aliases { - aliases.iter().for_each(|alias| { - let alias_fullname = alias.fully_qualified_name(namespace); - self.resolving_schemas - .insert(alias_fullname, resolving_schema.clone()); - }); - } - } - - fn register_parsed_schema( - &mut self, - fully_qualified_name: &Name, - schema: &Schema, - aliases: &Aliases, - ) { - // FIXME, this should be globally aware, so if there is something overwriting something - // else then there is an ambiguous schema definition. An appropriate error should be thrown - self.parsed_schemas - .insert(fully_qualified_name.clone(), schema.clone()); - self.resolving_schemas.remove(fully_qualified_name); - - let namespace = &fully_qualified_name.namespace; - - if let Some(aliases) = aliases { - aliases.iter().for_each(|alias| { - let alias_fullname = alias.fully_qualified_name(namespace); - self.resolving_schemas.remove(&alias_fullname); - self.parsed_schemas.insert(alias_fullname, schema.clone()); - }); - } - } - - /// Returns already parsed schema or a schema that is currently being resolved. - fn get_already_seen_schema( - &self, - complex: &Map<String, Value>, - enclosing_namespace: &Namespace, - ) -> Option<&Schema> { - match complex.get("type") { - Some(Value::String(typ)) => { - let name = Name::new(typ.as_str()) - .unwrap() - .fully_qualified_name(enclosing_namespace); - self.resolving_schemas - .get(&name) - .or_else(|| self.parsed_schemas.get(&name)) - } - _ => None, - } - } - - /// Parse a `serde_json::Value` representing a Avro record type into a - /// `Schema`. - fn parse_record( - &mut self, - complex: &Map<String, Value>, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - let fields_opt = complex.get("fields"); - - if fields_opt.is_none() - && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) - { - return Ok(seen.clone()); - } - - let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; - let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace); - - let mut lookup = BTreeMap::new(); - - self.register_resolving_schema(&fully_qualified_name, &aliases); - - debug!("Going to parse record schema: {:?}", &fully_qualified_name); - - let fields: Vec<RecordField> = fields_opt - .and_then(|fields| fields.as_array()) - .ok_or_else(|| Error::new(Details::GetRecordFieldsJson)) - .and_then(|fields| { - fields - .iter() - .filter_map(|field| field.as_object()) - .enumerate() - .map(|(position, field)| { - RecordField::parse(field, position, self, &fully_qualified_name) - }) - .collect::<Result<_, _>>() - })?; - - for field in &fields { - if let Some(_old) = lookup.insert(field.name.clone(), field.position) { - return Err(Details::FieldNameDuplicate(field.name.clone()).into()); - } - - if let Some(ref field_aliases) = field.aliases { - for alias in field_aliases { - lookup.insert(alias.clone(), field.position); - } - } - } - - let schema = Schema::Record(RecordSchema { - name: fully_qualified_name.clone(), - aliases: aliases.clone(), - doc: complex.doc(), - fields, - lookup, - attributes: self.get_custom_attributes(complex, vec!["fields"]), - }); - - self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); - Ok(schema) - } - - fn get_custom_attributes( - &self, - complex: &Map<String, Value>, - excluded: Vec<&'static str>, - ) -> BTreeMap<String, Value> { - let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new(); - for (key, value) in complex { - match key.as_str() { - "type" | "name" | "namespace" | "doc" | "aliases" | "logicalType" => continue, - candidate if excluded.contains(&candidate) => continue, - _ => custom_attributes.insert(key.clone(), value.clone()), - }; - } - custom_attributes - } - - /// Parse a `serde_json::Value` representing a Avro enum type into a - /// `Schema`. - fn parse_enum( - &mut self, - complex: &Map<String, Value>, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - let symbols_opt = complex.get("symbols"); - - if symbols_opt.is_none() - && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) - { - return Ok(seen.clone()); - } - - let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; - let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace); - - let symbols: Vec<String> = symbols_opt - .and_then(|v| v.as_array()) - .ok_or_else(|| Error::from(Details::GetEnumSymbolsField)) - .and_then(|symbols| { - symbols - .iter() - .map(|symbol| symbol.as_str().map(|s| s.to_string())) - .collect::<Option<_>>() - .ok_or_else(|| Error::from(Details::GetEnumSymbols)) - })?; - - let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len()); - for symbol in symbols.iter() { - validate_enum_symbol_name(symbol)?; - - // Ensure there are no duplicate symbols - if existing_symbols.contains(&symbol) { - return Err(Details::EnumSymbolDuplicate(symbol.to_string()).into()); - } - - existing_symbols.insert(symbol); - } - - let mut default: Option<String> = None; - if let Some(value) = complex.get("default") { - if let Value::String(ref s) = *value { - default = Some(s.clone()); - } else { - return Err(Details::EnumDefaultWrongType(value.clone()).into()); - } - } - - if let Some(ref value) = default { - let resolved = types::Value::from(value.clone()) - .resolve_enum(&symbols, &Some(value.to_string()), &None) - .is_ok(); - if !resolved { - return Err(Details::GetEnumDefault { - symbol: value.to_string(), - symbols, - } - .into()); - } - } - - let schema = Schema::Enum(EnumSchema { - name: fully_qualified_name.clone(), - aliases: aliases.clone(), - doc: complex.doc(), - symbols, - default, - attributes: self.get_custom_attributes(complex, vec!["symbols"]), - }); - - self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); - - Ok(schema) - } - - /// Parse a `serde_json::Value` representing a Avro array type into a - /// `Schema`. - fn parse_array( - &mut self, - complex: &Map<String, Value>, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - complex - .get("items") - .ok_or_else(|| Details::GetArrayItemsField.into()) - .and_then(|items| self.parse(items, enclosing_namespace)) - .map(|items| { - Schema::array_with_attributes( - items, - self.get_custom_attributes(complex, vec!["items"]), - ) - }) - } - - /// Parse a `serde_json::Value` representing a Avro map type into a - /// `Schema`. - fn parse_map( - &mut self, - complex: &Map<String, Value>, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - complex - .get("values") - .ok_or_else(|| Details::GetMapValuesField.into()) - .and_then(|items| self.parse(items, enclosing_namespace)) - .map(|items| { - Schema::map_with_attributes( - items, - self.get_custom_attributes(complex, vec!["values"]), - ) - }) - } - - /// Parse a `serde_json::Value` representing a Avro union type into a - /// `Schema`. - fn parse_union( - &mut self, - items: &[Value], - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - items - .iter() - .map(|v| self.parse(v, enclosing_namespace)) - .collect::<Result<Vec<_>, _>>() - .and_then(|schemas| { - if schemas.is_empty() { - error!( - "Union schemas should have at least two members! \ - Please enable debug logging to find out which Record schema \ - declares the union with 'RUST_LOG=apache_avro::schema=debug'." - ); - } else if schemas.len() == 1 { - warn!( - "Union schema with just one member! Consider dropping the union! \ - Please enable debug logging to find out which Record schema \ - declares the union with 'RUST_LOG=apache_avro::schema=debug'." - ); - } - Ok(Schema::Union(UnionSchema::new(schemas)?)) - }) - } - - /// Parse a `serde_json::Value` representing a Avro fixed type into a - /// `Schema`. - fn parse_fixed( - &mut self, - complex: &Map<String, Value>, - enclosing_namespace: &Namespace, - ) -> AvroResult<Schema> { - let size_opt = complex.get("size"); - if size_opt.is_none() - && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) - { - return Ok(seen.clone()); - } - - let doc = complex.get("doc").and_then(|v| match &v { - &Value::String(docstr) => Some(docstr.clone()), - _ => None, - }); - - let size = match size_opt { - Some(size) => size - .as_u64() - .ok_or_else(|| Details::GetFixedSizeFieldPositive(size.clone())), - None => Err(Details::GetFixedSizeField), - }?; - - let default = complex.get("default").and_then(|v| match &v { - &Value::String(default) => Some(default.clone()), - _ => None, - }); - - if default.is_some() { - let len = default.clone().unwrap().len(); - if len != size as usize { - return Err(Details::FixedDefaultLenSizeMismatch(len, size).into()); - } - } - - let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; - let aliases = fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace); - - let schema = Schema::Fixed(FixedSchema { - name: fully_qualified_name.clone(), - aliases: aliases.clone(), - doc, - size: size as usize, - default, - attributes: self.get_custom_attributes(complex, vec!["size"]), - }); - - self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); - - Ok(schema) - } -} - -// A type alias may be specified either as a fully namespace-qualified, or relative -// to the namespace of the name it is an alias for. For example, if a type named "a.b" -// has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c" -// and "x.y". -// https://avro.apache.org/docs/current/specification/#aliases -fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace) -> Aliases { - aliases.map(|aliases| { - aliases - .iter() - .map(|alias| { - if alias.find('.').is_none() { - match namespace { - Some(ns) => format!("{ns}.{alias}"), - None => alias.clone(), - } - } else { - alias.clone() - } - }) - .map(|alias| Alias::new(alias.as_str()).unwrap()) - .collect() - }) -} - -fn get_schema_type_name(name: Name, value: Value) -> Name { - match value.get("type") { - Some(Value::Object(complex_type)) => match complex_type.name() { - Some(name) => Name::new(name.as_str()).unwrap(), - _ => name, - }, - _ => name, - } -} - impl Serialize for Schema { fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> where diff --git a/avro/src/schema/parser.rs b/avro/src/schema/parser.rs new file mode 100644 index 0000000..ced79bc --- /dev/null +++ b/avro/src/schema/parser.rs @@ -0,0 +1,881 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::error::Details; +use crate::schema::record::RecordSchemaParseLocation; +use crate::schema::{ + Alias, Aliases, DecimalMetadata, DecimalSchema, EnumSchema, FixedSchema, Name, Names, + Namespace, Precision, RecordField, RecordSchema, Scale, Schema, SchemaKind, UnionSchema, + UuidSchema, +}; +use crate::types; +use crate::util::MapHelper; +use crate::validator::validate_enum_symbol_name; +use crate::{AvroResult, Error}; +use log::{debug, error, warn}; +use serde_json::{Map, Value}; +use std::collections::{BTreeMap, HashMap, HashSet}; + +#[derive(Default)] +pub(crate) struct Parser { + input_schemas: HashMap<Name, Value>, + /// A map of name -> Schema::Ref + /// Used to resolve cyclic references, i.e. when a + /// field's type is a reference to its record's type + resolving_schemas: Names, + input_order: Vec<Name>, + /// A map of name -> fully parsed Schema + /// Used to avoid parsing the same schema twice + parsed_schemas: Names, +} + +impl Parser { + pub(crate) fn new( + input_schemas: HashMap<Name, Value>, + input_order: Vec<Name>, + parsed_schemas: Names, + ) -> Self { + Self { + input_schemas, + resolving_schemas: HashMap::default(), + input_order, + parsed_schemas, + } + } + + pub(crate) fn get_parsed_schemas(&mut self) -> &Names { + &self.parsed_schemas + } + + /// Create a `Schema` from a string representing a JSON Avro schema. + pub(super) fn parse_str(&mut self, input: &str) -> AvroResult<Schema> { + let value = serde_json::from_str(input).map_err(Details::ParseSchemaJson)?; + self.parse(&value, &None) + } + + /// Create an array of `Schema`'s from an iterator of JSON Avro schemas. It is allowed that + /// the schemas have cross-dependencies; these will be resolved during parsing. + pub(super) fn parse_list(&mut self) -> AvroResult<Vec<Schema>> { + self.parse_input_schemas()?; + + let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len()); + for name in self.input_order.drain(0..) { + let parsed = self + .parsed_schemas + .remove(&name) + .expect("One of the input schemas was unexpectedly not parsed"); + parsed_schemas.push(parsed); + } + Ok(parsed_schemas) + } + + /// Convert the input schemas to parsed_schemas + pub(super) fn parse_input_schemas(&mut self) -> Result<(), Error> { + while !self.input_schemas.is_empty() { + let next_name = self + .input_schemas + .keys() + .next() + .expect("Input schemas unexpectedly empty") + .to_owned(); + let (name, value) = self + .input_schemas + .remove_entry(&next_name) + .expect("Key unexpectedly missing"); + let parsed = self.parse(&value, &None)?; + self.parsed_schemas + .insert(self.get_schema_type_name(name, value), parsed); + } + Ok(()) + } + + /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro + /// schema. + pub(super) fn parse( + &mut self, + value: &Value, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + match *value { + Value::String(ref t) => self.parse_known_schema(t.as_str(), enclosing_namespace), + Value::Object(ref data) => { + self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root) + } + Value::Array(ref data) => self.parse_union(data, enclosing_namespace), + _ => Err(Details::ParseSchemaFromValidJson.into()), + } + } + + /// Parse a `serde_json::Value` representing an Avro type whose Schema is known into a + /// `Schema`. A Schema for a `serde_json::Value` is known if it is primitive or has + /// been parsed previously by the parsed and stored in its map of parsed_schemas. + fn parse_known_schema( + &mut self, + name: &str, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + match name { + "null" => Ok(Schema::Null), + "boolean" => Ok(Schema::Boolean), + "int" => Ok(Schema::Int), + "long" => Ok(Schema::Long), + "double" => Ok(Schema::Double), + "float" => Ok(Schema::Float), + "bytes" => Ok(Schema::Bytes), + "string" => Ok(Schema::String), + _ => self.fetch_schema_ref(name, enclosing_namespace), + } + } + + /// Given a name, tries to retrieve the parsed schema from `parsed_schemas`. + /// If a parsed schema is not found, it checks if a currently resolving + /// schema with that name exists. + /// If a resolving schema is not found, it checks if a json with that name exists + /// in `input_schemas` and then parses it (removing it from `input_schemas`) + /// and adds the parsed schema to `parsed_schemas`. + /// + /// This method allows schemas definitions that depend on other types to + /// parse their dependencies (or look them up if already parsed). + pub(super) fn fetch_schema_ref( + &mut self, + name: &str, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + fn get_schema_ref(parsed: &Schema) -> Schema { + match parsed { + &Schema::Record(RecordSchema { ref name, .. }) + | &Schema::Enum(EnumSchema { ref name, .. }) + | &Schema::Fixed(FixedSchema { ref name, .. }) => { + Schema::Ref { name: name.clone() } + } + _ => parsed.clone(), + } + } + + let name = Name::new(name)?; + let fully_qualified_name = name.fully_qualified_name(enclosing_namespace); + + if self.parsed_schemas.contains_key(&fully_qualified_name) { + return Ok(Schema::Ref { + name: fully_qualified_name, + }); + } + if let Some(resolving_schema) = self.resolving_schemas.get(&fully_qualified_name) { + return Ok(resolving_schema.clone()); + } + + // For good error reporting we add this check + match name.name.as_str() { + "record" | "enum" | "fixed" => { + return Err(Details::InvalidSchemaRecord(name.to_string()).into()); + } + _ => (), + } + + let value = self + .input_schemas + .remove(&fully_qualified_name) + // TODO make a better descriptive error message here that conveys that a named schema cannot be found + .ok_or_else(|| Details::ParsePrimitive(fully_qualified_name.fullname(None)))?; + + // parsing a full schema from inside another schema. Other full schema will not inherit namespace + let parsed = self.parse(&value, &None)?; + self.parsed_schemas + .insert(self.get_schema_type_name(name, value), parsed.clone()); + + Ok(get_schema_ref(&parsed)) + } + + fn get_decimal_integer( + &self, + complex: &Map<String, Value>, + key: &'static str, + ) -> AvroResult<DecimalMetadata> { + match complex.get(key) { + Some(Value::Number(value)) => self.parse_json_integer_for_decimal(value), + None => { + if key == "scale" { + Ok(0) + } else { + Err(Details::GetDecimalMetadataFromJson(key).into()) + } + } + Some(value) => Err(Details::GetDecimalMetadataValueFromJson { + key: key.into(), + value: value.clone(), + } + .into()), + } + } + + fn parse_precision_and_scale( + &self, + complex: &Map<String, Value>, + ) -> AvroResult<(Precision, Scale)> { + let precision = self.get_decimal_integer(complex, "precision")?; + let scale = self.get_decimal_integer(complex, "scale")?; + + if precision < 1 { + return Err(Details::DecimalPrecisionMuBePositive { precision }.into()); + } + + if precision < scale { + Err(Details::DecimalPrecisionLessThanScale { precision, scale }.into()) + } else { + Ok((precision, scale)) + } + } + + /// Parse a `serde_json::Value` representing a complex Avro type into a + /// `Schema`. + /// + /// Avro supports "recursive" definition of types. + /// e.g: {"type": {"type": "string"}} + pub(super) fn parse_complex( + &mut self, + complex: &Map<String, Value>, + enclosing_namespace: &Namespace, + parse_location: RecordSchemaParseLocation, + ) -> AvroResult<Schema> { + // Try to parse this as a native complex type. + fn parse_as_native_complex( + complex: &Map<String, Value>, + parser: &mut Parser, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + match complex.get("type") { + Some(value) => match value { + Value::String(s) if s == "fixed" => { + parser.parse_fixed(complex, enclosing_namespace) + } + _ => parser.parse(value, enclosing_namespace), + }, + None => Err(Details::GetLogicalTypeField.into()), + } + } + + // This crate support some logical types natively, and this function tries to convert + // a native complex type with a logical type attribute to these logical types. + // This function: + // 1. Checks whether the native complex type is in the supported kinds. + // 2. If it is, using the convert function to convert the native complex type to + // a logical type. + fn try_convert_to_logical_type<F>( + logical_type: &str, + schema: Schema, + supported_schema_kinds: &[SchemaKind], + convert: F, + ) -> AvroResult<Schema> + where + F: Fn(Schema) -> AvroResult<Schema>, + { + let kind = SchemaKind::from(schema.clone()); + if supported_schema_kinds.contains(&kind) { + convert(schema) + } else { + warn!( + "Ignoring unknown logical type '{logical_type}' for schema of type: {schema:?}!" + ); + Ok(schema) + } + } + + match complex.get("logicalType") { + Some(Value::String(t)) => match t.as_str() { + "decimal" => { + return try_convert_to_logical_type( + "decimal", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Fixed, SchemaKind::Bytes], + |inner| -> AvroResult<Schema> { + match self.parse_precision_and_scale(complex) { + Ok((precision, scale)) => Ok(Schema::Decimal(DecimalSchema { + precision, + scale, + inner: inner.try_into()?, + })), + Err(err) => { + warn!("Ignoring invalid decimal logical type: {err}"); + Ok(inner) + } + } + }, + ); + } + "big-decimal" => { + return try_convert_to_logical_type( + "big-decimal", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Bytes], + |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) }, + ); + } + "uuid" => { + return try_convert_to_logical_type( + "uuid", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::String, SchemaKind::Fixed, SchemaKind::Bytes], + |schema| match schema { + Schema::String => Ok(Schema::Uuid(UuidSchema::String)), + Schema::Fixed(fixed @ FixedSchema { size: 16, .. }) => { + Ok(Schema::Uuid(UuidSchema::Fixed(fixed))) + } + Schema::Fixed(FixedSchema { size, .. }) => { + warn!( + "Ignoring uuid logical type for a Fixed schema because its size ({size:?}) is not 16! Schema: {schema:?}" + ); + Ok(schema) + } + Schema::Bytes => Ok(Schema::Uuid(UuidSchema::Bytes)), + _ => { + warn!("Ignoring invalid uuid logical type for schema: {schema:?}"); + Ok(schema) + } + }, + ); + } + "date" => { + return try_convert_to_logical_type( + "date", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Int], + |_| -> AvroResult<Schema> { Ok(Schema::Date) }, + ); + } + "time-millis" => { + return try_convert_to_logical_type( + "date", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Int], + |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) }, + ); + } + "time-micros" => { + return try_convert_to_logical_type( + "time-micros", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Long], + |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) }, + ); + } + "timestamp-millis" => { + return try_convert_to_logical_type( + "timestamp-millis", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Long], + |_| -> AvroResult<Schema> { Ok(Schema::TimestampMillis) }, + ); + } + "timestamp-micros" => { + return try_convert_to_logical_type( + "timestamp-micros", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Long], + |_| -> AvroResult<Schema> { Ok(Schema::TimestampMicros) }, + ); + } + "timestamp-nanos" => { + return try_convert_to_logical_type( + "timestamp-nanos", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Long], + |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos) }, + ); + } + "local-timestamp-millis" => { + return try_convert_to_logical_type( + "local-timestamp-millis", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Long], + |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMillis) }, + ); + } + "local-timestamp-micros" => { + return try_convert_to_logical_type( + "local-timestamp-micros", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Long], + |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampMicros) }, + ); + } + "local-timestamp-nanos" => { + return try_convert_to_logical_type( + "local-timestamp-nanos", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Long], + |_| -> AvroResult<Schema> { Ok(Schema::LocalTimestampNanos) }, + ); + } + "duration" => { + return try_convert_to_logical_type( + "duration", + parse_as_native_complex(complex, self, enclosing_namespace)?, + &[SchemaKind::Fixed], + |schema| -> AvroResult<Schema> { + match schema { + Schema::Fixed(fixed @ FixedSchema { size: 12, .. }) => { + Ok(Schema::Duration(fixed)) + } + Schema::Fixed(FixedSchema { size, .. }) => { + warn!( + "Ignoring duration logical type on fixed type because size ({size}) is not 12! Schema: {schema:?}" + ); + Ok(schema) + } + _ => { + warn!( + "Ignoring invalid duration logical type for schema: {schema:?}" + ); + Ok(schema) + } + } + }, + ); + } + // In this case, of an unknown logical type, we just pass through the underlying + // type. + _ => {} + }, + // The spec says to ignore invalid logical types and just pass through the + // underlying type. It is unclear whether that applies to this case or not, where the + // `logicalType` is not a string. + Some(value) => return Err(Details::GetLogicalTypeFieldType(value.clone()).into()), + _ => {} + } + match complex.get("type") { + Some(Value::String(t)) => match t.as_str() { + "record" => match parse_location { + RecordSchemaParseLocation::Root => { + self.parse_record(complex, enclosing_namespace) + } + RecordSchemaParseLocation::FromField => { + self.fetch_schema_ref(t, enclosing_namespace) + } + }, + "enum" => self.parse_enum(complex, enclosing_namespace), + "array" => self.parse_array(complex, enclosing_namespace), + "map" => self.parse_map(complex, enclosing_namespace), + "fixed" => self.parse_fixed(complex, enclosing_namespace), + other => self.parse_known_schema(other, enclosing_namespace), + }, + Some(Value::Object(data)) => { + self.parse_complex(data, enclosing_namespace, RecordSchemaParseLocation::Root) + } + Some(Value::Array(variants)) => self.parse_union(variants, enclosing_namespace), + Some(unknown) => Err(Details::GetComplexType(unknown.clone()).into()), + None => Err(Details::GetComplexTypeField.into()), + } + } + + fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) { + let resolving_schema = Schema::Ref { name: name.clone() }; + self.resolving_schemas + .insert(name.clone(), resolving_schema.clone()); + + let namespace = &name.namespace; + + if let Some(aliases) = aliases { + aliases.iter().for_each(|alias| { + let alias_fullname = alias.fully_qualified_name(namespace); + self.resolving_schemas + .insert(alias_fullname, resolving_schema.clone()); + }); + } + } + + fn register_parsed_schema( + &mut self, + fully_qualified_name: &Name, + schema: &Schema, + aliases: &Aliases, + ) { + // FIXME, this should be globally aware, so if there is something overwriting something + // else then there is an ambiguous schema definition. An appropriate error should be thrown + self.parsed_schemas + .insert(fully_qualified_name.clone(), schema.clone()); + self.resolving_schemas.remove(fully_qualified_name); + + let namespace = &fully_qualified_name.namespace; + + if let Some(aliases) = aliases { + aliases.iter().for_each(|alias| { + let alias_fullname = alias.fully_qualified_name(namespace); + self.resolving_schemas.remove(&alias_fullname); + self.parsed_schemas.insert(alias_fullname, schema.clone()); + }); + } + } + + /// Returns already parsed schema or a schema that is currently being resolved. + fn get_already_seen_schema( + &self, + complex: &Map<String, Value>, + enclosing_namespace: &Namespace, + ) -> Option<&Schema> { + match complex.get("type") { + Some(Value::String(typ)) => { + let name = Name::new(typ.as_str()) + .unwrap() + .fully_qualified_name(enclosing_namespace); + self.resolving_schemas + .get(&name) + .or_else(|| self.parsed_schemas.get(&name)) + } + _ => None, + } + } + + /// Parse a `serde_json::Value` representing a Avro record type into a + /// `Schema`. + fn parse_record( + &mut self, + complex: &Map<String, Value>, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + let fields_opt = complex.get("fields"); + + if fields_opt.is_none() + && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) + { + return Ok(seen.clone()); + } + + let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; + let aliases = + self.fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace); + + let mut lookup = BTreeMap::new(); + + self.register_resolving_schema(&fully_qualified_name, &aliases); + + debug!("Going to parse record schema: {:?}", &fully_qualified_name); + + let fields: Vec<RecordField> = fields_opt + .and_then(|fields| fields.as_array()) + .ok_or_else(|| Error::new(Details::GetRecordFieldsJson)) + .and_then(|fields| { + fields + .iter() + .filter_map(|field| field.as_object()) + .enumerate() + .map(|(position, field)| { + RecordField::parse(field, position, self, &fully_qualified_name) + }) + .collect::<Result<_, _>>() + })?; + + for field in &fields { + if let Some(_old) = lookup.insert(field.name.clone(), field.position) { + return Err(Details::FieldNameDuplicate(field.name.clone()).into()); + } + + if let Some(ref field_aliases) = field.aliases { + for alias in field_aliases { + lookup.insert(alias.clone(), field.position); + } + } + } + + let schema = Schema::Record(RecordSchema { + name: fully_qualified_name.clone(), + aliases: aliases.clone(), + doc: complex.doc(), + fields, + lookup, + attributes: self.get_custom_attributes(complex, vec!["fields"]), + }); + + self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); + Ok(schema) + } + + fn get_custom_attributes( + &self, + complex: &Map<String, Value>, + excluded: Vec<&'static str>, + ) -> BTreeMap<String, Value> { + let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new(); + for (key, value) in complex { + match key.as_str() { + "type" | "name" | "namespace" | "doc" | "aliases" | "logicalType" => continue, + candidate if excluded.contains(&candidate) => continue, + _ => custom_attributes.insert(key.clone(), value.clone()), + }; + } + custom_attributes + } + + /// Parse a `serde_json::Value` representing a Avro enum type into a + /// `Schema`. + fn parse_enum( + &mut self, + complex: &Map<String, Value>, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + let symbols_opt = complex.get("symbols"); + + if symbols_opt.is_none() + && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) + { + return Ok(seen.clone()); + } + + let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; + let aliases = + self.fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace); + + let symbols: Vec<String> = symbols_opt + .and_then(|v| v.as_array()) + .ok_or_else(|| Error::from(Details::GetEnumSymbolsField)) + .and_then(|symbols| { + symbols + .iter() + .map(|symbol| symbol.as_str().map(|s| s.to_string())) + .collect::<Option<_>>() + .ok_or_else(|| Error::from(Details::GetEnumSymbols)) + })?; + + let mut existing_symbols: HashSet<&String> = HashSet::with_capacity(symbols.len()); + for symbol in symbols.iter() { + validate_enum_symbol_name(symbol)?; + + // Ensure there are no duplicate symbols + if existing_symbols.contains(&symbol) { + return Err(Details::EnumSymbolDuplicate(symbol.to_string()).into()); + } + + existing_symbols.insert(symbol); + } + + let mut default: Option<String> = None; + if let Some(value) = complex.get("default") { + if let Value::String(ref s) = *value { + default = Some(s.clone()); + } else { + return Err(Details::EnumDefaultWrongType(value.clone()).into()); + } + } + + if let Some(ref value) = default { + let resolved = types::Value::from(value.clone()) + .resolve_enum(&symbols, &Some(value.to_string()), &None) + .is_ok(); + if !resolved { + return Err(Details::GetEnumDefault { + symbol: value.to_string(), + symbols, + } + .into()); + } + } + + let schema = Schema::Enum(EnumSchema { + name: fully_qualified_name.clone(), + aliases: aliases.clone(), + doc: complex.doc(), + symbols, + default, + attributes: self.get_custom_attributes(complex, vec!["symbols"]), + }); + + self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); + + Ok(schema) + } + + /// Parse a `serde_json::Value` representing a Avro array type into a + /// `Schema`. + fn parse_array( + &mut self, + complex: &Map<String, Value>, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + complex + .get("items") + .ok_or_else(|| Details::GetArrayItemsField.into()) + .and_then(|items| self.parse(items, enclosing_namespace)) + .map(|items| { + Schema::array_with_attributes( + items, + self.get_custom_attributes(complex, vec!["items"]), + ) + }) + } + + /// Parse a `serde_json::Value` representing a Avro map type into a + /// `Schema`. + fn parse_map( + &mut self, + complex: &Map<String, Value>, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + complex + .get("values") + .ok_or_else(|| Details::GetMapValuesField.into()) + .and_then(|items| self.parse(items, enclosing_namespace)) + .map(|items| { + Schema::map_with_attributes( + items, + self.get_custom_attributes(complex, vec!["values"]), + ) + }) + } + + /// Parse a `serde_json::Value` representing a Avro union type into a + /// `Schema`. + fn parse_union( + &mut self, + items: &[Value], + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + items + .iter() + .map(|v| self.parse(v, enclosing_namespace)) + .collect::<Result<Vec<_>, _>>() + .and_then(|schemas| { + if schemas.is_empty() { + error!( + "Union schemas should have at least two members! \ + Please enable debug logging to find out which Record schema \ + declares the union with 'RUST_LOG=apache_avro::schema=debug'." + ); + } else if schemas.len() == 1 { + warn!( + "Union schema with just one member! Consider dropping the union! \ + Please enable debug logging to find out which Record schema \ + declares the union with 'RUST_LOG=apache_avro::schema=debug'." + ); + } + Ok(Schema::Union(UnionSchema::new(schemas)?)) + }) + } + + /// Parse a `serde_json::Value` representing a Avro fixed type into a + /// `Schema`. + fn parse_fixed( + &mut self, + complex: &Map<String, Value>, + enclosing_namespace: &Namespace, + ) -> AvroResult<Schema> { + let size_opt = complex.get("size"); + if size_opt.is_none() + && let Some(seen) = self.get_already_seen_schema(complex, enclosing_namespace) + { + return Ok(seen.clone()); + } + + let doc = complex.get("doc").and_then(|v| match &v { + &Value::String(docstr) => Some(docstr.clone()), + _ => None, + }); + + let size = match size_opt { + Some(size) => size + .as_u64() + .ok_or_else(|| Details::GetFixedSizeFieldPositive(size.clone())), + None => Err(Details::GetFixedSizeField), + }?; + + let default = complex.get("default").and_then(|v| match &v { + &Value::String(default) => Some(default.clone()), + _ => None, + }); + + if default.is_some() { + let len = default.clone().unwrap().len(); + if len != size as usize { + return Err(Details::FixedDefaultLenSizeMismatch(len, size).into()); + } + } + + let fully_qualified_name = Name::parse(complex, enclosing_namespace)?; + let aliases = + self.fix_aliases_namespace(complex.aliases(), &fully_qualified_name.namespace); + + let schema = Schema::Fixed(FixedSchema { + name: fully_qualified_name.clone(), + aliases: aliases.clone(), + doc, + size: size as usize, + default, + attributes: self.get_custom_attributes(complex, vec!["size"]), + }); + + self.register_parsed_schema(&fully_qualified_name, &schema, &aliases); + + Ok(schema) + } + + // A type alias may be specified either as a fully namespace-qualified, or relative + // to the namespace of the name it is an alias for. For example, if a type named "a.b" + // has aliases of "c" and "x.y", then the fully qualified names of its aliases are "a.c" + // and "x.y". + // https://avro.apache.org/docs/current/specification/#aliases + fn fix_aliases_namespace( + &self, + aliases: Option<Vec<String>>, + namespace: &Namespace, + ) -> Aliases { + aliases.map(|aliases| { + aliases + .iter() + .map(|alias| { + if alias.find('.').is_none() { + match namespace { + Some(ns) => format!("{ns}.{alias}"), + None => alias.clone(), + } + } else { + alias.clone() + } + }) + .map(|alias| Alias::new(alias.as_str()).unwrap()) + .collect() + }) + } + + fn get_schema_type_name(&self, name: Name, value: Value) -> Name { + match value.get("type") { + Some(Value::Object(complex_type)) => match complex_type.name() { + Some(name) => Name::new(name.as_str()).unwrap(), + _ => name, + }, + _ => name, + } + } + + fn parse_json_integer_for_decimal( + &self, + value: &serde_json::Number, + ) -> AvroResult<DecimalMetadata> { + Ok(if value.is_u64() { + let num = value + .as_u64() + .ok_or_else(|| Details::GetU64FromJson(value.clone()))?; + num.try_into() + .map_err(|e| Details::ConvertU64ToUsize(e, num))? + } else if value.is_i64() { + let num = value + .as_i64() + .ok_or_else(|| Details::GetI64FromJson(value.clone()))?; + num.try_into() + .map_err(|e| Details::ConvertI64ToUsize(e, num))? + } else { + return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into()); + }) + } +} diff --git a/avro/src/schema/record/field.rs b/avro/src/schema/record/field.rs index 365190c..9a42310 100644 --- a/avro/src/schema/record/field.rs +++ b/avro/src/schema/record/field.rs @@ -92,7 +92,7 @@ impl RecordField { &schema, &name, &enclosing_record.fullname(None), - &parser.parsed_schemas, + parser.get_parsed_schemas(), &default, )?;
