This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c98949f chore: Extract Parser to its own module
(src/schema/parser.rs) (#415)
c98949f is described below
commit c98949f05b1881df4779fd14c85c4640c1625626
Author: Martin Grigorov <[email protected]>
AuthorDate: Tue Jan 20 10:11:00 2026 +0200
chore: Extract Parser to its own module (src/schema/parser.rs) (#415)
Minor internal API changes to fix the build.
No API breaks!
---
avro/src/schema/mod.rs | 842 +-------------------------------------
avro/src/schema/parser.rs | 881 ++++++++++++++++++++++++++++++++++++++++
avro/src/schema/record/field.rs | 2 +-
3 files changed, 892 insertions(+), 833 deletions(-)
diff --git a/avro/src/schema/mod.rs b/avro/src/schema/mod.rs
index 42a0b80..d915f46 100644
--- a/avro/src/schema/mod.rs
+++ b/avro/src/schema/mod.rs
@@ -21,10 +21,9 @@ use crate::{
error::{Details, Error},
schema_equality, types,
util::MapHelper,
- validator::{validate_enum_symbol_name, validate_namespace,
validate_schema_name},
+ validator::{validate_namespace, validate_schema_name},
};
use digest::Digest;
-use log::{debug, error, warn};
use serde::{
Deserialize, Serialize, Serializer,
ser::{SerializeMap, SerializeSeq},
@@ -47,6 +46,8 @@ pub use record::{
};
mod union;
pub use union::UnionSchema;
+mod parser;
+use parser::Parser;
/// Represents an Avro schema fingerprint
/// More information about Avro schema fingerprints can be found in the
@@ -780,37 +781,6 @@ type DecimalMetadata = usize;
pub(crate) type Precision = DecimalMetadata;
pub(crate) type Scale = DecimalMetadata;
-fn parse_json_integer_for_decimal(value: &serde_json::Number) ->
Result<DecimalMetadata, Error> {
- Ok(if value.is_u64() {
- let num = value
- .as_u64()
- .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
- num.try_into()
- .map_err(|e| Details::ConvertU64ToUsize(e, num))?
- } else if value.is_i64() {
- let num = value
- .as_i64()
- .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
- num.try_into()
- .map_err(|e| Details::ConvertI64ToUsize(e, num))?
- } else {
- return Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
- })
-}
-
-#[derive(Default)]
-pub(crate) struct Parser {
- input_schemas: HashMap<Name, Value>,
- /// A map of name -> Schema::Ref
- /// Used to resolve cyclic references, i.e. when a
- /// field's type is a reference to its record's type
- resolving_schemas: Names,
- input_order: Vec<Name>,
- /// A map of name -> fully parsed Schema
- /// Used to avoid parsing the same schema twice
- parsed_schemas: Names,
-}
-
impl Schema {
/// Converts `self` into its [Parsing Canonical Form].
///
@@ -880,12 +850,11 @@ impl Schema {
return Err(Details::GetNameField.into());
}
}
- let mut parser = Parser {
+ let mut parser = Parser::new(
input_schemas,
- resolving_schemas: HashMap::default(),
input_order,
- parsed_schemas: HashMap::with_capacity(input_len),
- };
+ HashMap::with_capacity(input_len),
+ );
parser.parse_list()
}
@@ -922,12 +891,11 @@ impl Schema {
return Err(Details::GetNameField.into());
}
}
- let mut parser = Parser {
+ let mut parser = Parser::new(
input_schemas,
- resolving_schemas: HashMap::default(),
input_order,
- parsed_schemas: HashMap::with_capacity(schemata_len),
- };
+ HashMap::with_capacity(schemata_len),
+ );
parser.parse_input_schemas()?;
let value =
serde_json::from_str(schema).map_err(Details::ParseSchemaJson)?;
@@ -954,12 +922,7 @@ impl Schema {
/// Parses an Avro schema from JSON.
/// Any `Schema::Ref`s must be known in the `names` map.
pub(crate) fn parse_with_names(value: &Value, names: Names) ->
AvroResult<Schema> {
- let mut parser = Parser {
- input_schemas: HashMap::with_capacity(1),
- resolving_schemas: Names::default(),
- input_order: Vec::with_capacity(1),
- parsed_schemas: names,
- };
+ let mut parser = Parser::new(HashMap::with_capacity(1),
Vec::with_capacity(1), names);
parser.parse(value, &None)
}
@@ -1129,791 +1092,6 @@ impl Schema {
}
}
-impl Parser {
- /// Create a `Schema` from a string representing a JSON Avro schema.
- fn parse_str(&mut self, input: &str) -> Result<Schema, Error> {
- let value =
serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
- self.parse(&value, &None)
- }
-
- /// Create an array of `Schema`'s from an iterator of JSON Avro schemas.
It is allowed that
- /// the schemas have cross-dependencies; these will be resolved during
parsing.
- fn parse_list(&mut self) -> Result<Vec<Schema>, Error> {
- self.parse_input_schemas()?;
-
- let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
- for name in self.input_order.drain(0..) {
- let parsed = self
- .parsed_schemas
- .remove(&name)
- .expect("One of the input schemas was unexpectedly not
parsed");
- parsed_schemas.push(parsed);
- }
- Ok(parsed_schemas)
- }
-
- /// Convert the input schemas to parsed_schemas
- fn parse_input_schemas(&mut self) -> Result<(), Error> {
- while !self.input_schemas.is_empty() {
- let next_name = self
- .input_schemas
- .keys()
- .next()
- .expect("Input schemas unexpectedly empty")
- .to_owned();
- let (name, value) = self
- .input_schemas
- .remove_entry(&next_name)
- .expect("Key unexpectedly missing");
- let parsed = self.parse(&value, &None)?;
- self.parsed_schemas
- .insert(get_schema_type_name(name, value), parsed);
- }
- Ok(())
- }
-
- /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
- /// schema.
- fn parse(&mut self, value: &Value, enclosing_namespace: &Namespace) ->
AvroResult<Schema> {
- match *value {
- Value::String(ref t) => self.parse_known_schema(t.as_str(),
enclosing_namespace),
- Value::Object(ref data) => {
- self.parse_complex(data, enclosing_namespace,
RecordSchemaParseLocation::Root)
- }
- Value::Array(ref data) => self.parse_union(data,
enclosing_namespace),
- _ => Err(Details::ParseSchemaFromValidJson.into()),
- }
- }
-
- /// Parse a `serde_json::Value` representing an Avro type whose Schema is
known into a
- /// `Schema`. A Schema for a `serde_json::Value` is known if it is
primitive or has
- /// been parsed previously by the parsed and stored in its map of
parsed_schemas.
- fn parse_known_schema(
- &mut self,
- name: &str,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- match name {
- "null" => Ok(Schema::Null),
- "boolean" => Ok(Schema::Boolean),
- "int" => Ok(Schema::Int),
- "long" => Ok(Schema::Long),
- "double" => Ok(Schema::Double),
- "float" => Ok(Schema::Float),
- "bytes" => Ok(Schema::Bytes),
- "string" => Ok(Schema::String),
- _ => self.fetch_schema_ref(name, enclosing_namespace),
- }
- }
-
- /// Given a name, tries to retrieve the parsed schema from
`parsed_schemas`.
- /// If a parsed schema is not found, it checks if a currently resolving
- /// schema with that name exists.
- /// If a resolving schema is not found, it checks if a json with that name
exists
- /// in `input_schemas` and then parses it (removing it from
`input_schemas`)
- /// and adds the parsed schema to `parsed_schemas`.
- ///
- /// This method allows schemas definitions that depend on other types to
- /// parse their dependencies (or look them up if already parsed).
- fn fetch_schema_ref(
- &mut self,
- name: &str,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- fn get_schema_ref(parsed: &Schema) -> Schema {
- match parsed {
- &Schema::Record(RecordSchema { ref name, .. })
- | &Schema::Enum(EnumSchema { ref name, .. })
- | &Schema::Fixed(FixedSchema { ref name, .. }) => {
- Schema::Ref { name: name.clone() }
- }
- _ => parsed.clone(),
- }
- }
-
- let name = Name::new(name)?;
- let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
-
- if self.parsed_schemas.contains_key(&fully_qualified_name) {
- return Ok(Schema::Ref {
- name: fully_qualified_name,
- });
- }
- if let Some(resolving_schema) =
self.resolving_schemas.get(&fully_qualified_name) {
- return Ok(resolving_schema.clone());
- }
-
- // For good error reporting we add this check
- match name.name.as_str() {
- "record" | "enum" | "fixed" => {
- return
Err(Details::InvalidSchemaRecord(name.to_string()).into());
- }
- _ => (),
- }
-
- let value = self
- .input_schemas
- .remove(&fully_qualified_name)
- // TODO make a better descriptive error message here that conveys
that a named schema cannot be found
- .ok_or_else(||
Details::ParsePrimitive(fully_qualified_name.fullname(None)))?;
-
- // parsing a full schema from inside another schema. Other full schema
will not inherit namespace
- let parsed = self.parse(&value, &None)?;
- self.parsed_schemas
- .insert(get_schema_type_name(name, value), parsed.clone());
-
- Ok(get_schema_ref(&parsed))
- }
-
- fn parse_precision_and_scale(
- complex: &Map<String, Value>,
- ) -> Result<(Precision, Scale), Error> {
- fn get_decimal_integer(
- complex: &Map<String, Value>,
- key: &'static str,
- ) -> Result<DecimalMetadata, Error> {
- match complex.get(key) {
- Some(Value::Number(value)) =>
parse_json_integer_for_decimal(value),
- None => {
- if key == "scale" {
- Ok(0)
- } else {
- Err(Details::GetDecimalMetadataFromJson(key).into())
- }
- }
- Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
- key: key.into(),
- value: value.clone(),
- }
- .into()),
- }
- }
- let precision = get_decimal_integer(complex, "precision")?;
- let scale = get_decimal_integer(complex, "scale")?;
-
- if precision < 1 {
- return Err(Details::DecimalPrecisionMuBePositive { precision
}.into());
- }
-
- if precision < scale {
- Err(Details::DecimalPrecisionLessThanScale { precision, scale
}.into())
- } else {
- Ok((precision, scale))
- }
- }
-
- /// Parse a `serde_json::Value` representing a complex Avro type into a
- /// `Schema`.
- ///
- /// Avro supports "recursive" definition of types.
- /// e.g: {"type": {"type": "string"}}
- fn parse_complex(
- &mut self,
- complex: &Map<String, Value>,
- enclosing_namespace: &Namespace,
- parse_location: RecordSchemaParseLocation,
- ) -> AvroResult<Schema> {
- // Try to parse this as a native complex type.
- fn parse_as_native_complex(
- complex: &Map<String, Value>,
- parser: &mut Parser,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- match complex.get("type") {
- Some(value) => match value {
- Value::String(s) if s == "fixed" => {
- parser.parse_fixed(complex, enclosing_namespace)
- }
- _ => parser.parse(value, enclosing_namespace),
- },
- None => Err(Details::GetLogicalTypeField.into()),
- }
- }
-
- // This crate support some logical types natively, and this function
tries to convert
- // a native complex type with a logical type attribute to these
logical types.
- // This function:
- // 1. Checks whether the native complex type is in the supported kinds.
- // 2. If it is, using the convert function to convert the native
complex type to
- // a logical type.
- fn try_convert_to_logical_type<F>(
- logical_type: &str,
- schema: Schema,
- supported_schema_kinds: &[SchemaKind],
- convert: F,
- ) -> AvroResult<Schema>
- where
- F: Fn(Schema) -> AvroResult<Schema>,
- {
- let kind = SchemaKind::from(schema.clone());
- if supported_schema_kinds.contains(&kind) {
- convert(schema)
- } else {
- warn!(
- "Ignoring unknown logical type '{logical_type}' for schema
of type: {schema:?}!"
- );
- Ok(schema)
- }
- }
-
- match complex.get("logicalType") {
- Some(Value::String(t)) => match t.as_str() {
- "decimal" => {
- return try_convert_to_logical_type(
- "decimal",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Fixed, SchemaKind::Bytes],
- |inner| -> AvroResult<Schema> {
- match Self::parse_precision_and_scale(complex) {
- Ok((precision, scale)) =>
Ok(Schema::Decimal(DecimalSchema {
- precision,
- scale,
- inner: inner.try_into()?,
- })),
- Err(err) => {
- warn!("Ignoring invalid decimal logical
type: {err}");
- Ok(inner)
- }
- }
- },
- );
- }
- "big-decimal" => {
- return try_convert_to_logical_type(
- "big-decimal",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Bytes],
- |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
- );
- }
- "uuid" => {
- return try_convert_to_logical_type(
- "uuid",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::String, SchemaKind::Fixed,
SchemaKind::Bytes],
- |schema| match schema {
- Schema::String =>
Ok(Schema::Uuid(UuidSchema::String)),
- Schema::Fixed(fixed @ FixedSchema { size: 16, ..
}) => {
- Ok(Schema::Uuid(UuidSchema::Fixed(fixed)))
- }
- Schema::Fixed(FixedSchema { size, .. }) => {
- warn!(
- "Ignoring uuid logical type for a Fixed
schema because its size ({size:?}) is not 16! Schema: {schema:?}"
- );
- Ok(schema)
- }
- Schema::Bytes =>
Ok(Schema::Uuid(UuidSchema::Bytes)),
- _ => {
- warn!("Ignoring invalid uuid logical type for
schema: {schema:?}");
- Ok(schema)
- }
- },
- );
- }
- "date" => {
- return try_convert_to_logical_type(
- "date",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Int],
- |_| -> AvroResult<Schema> { Ok(Schema::Date) },
- );
- }
- "time-millis" => {
- return try_convert_to_logical_type(
- "date",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Int],
- |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
- );
- }
- "time-micros" => {
- return try_convert_to_logical_type(
- "time-micros",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Long],
- |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
- );
- }
- "timestamp-millis" => {
- return try_convert_to_logical_type(
- "timestamp-millis",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Long],
- |_| -> AvroResult<Schema> {
Ok(Schema::TimestampMillis) },
- );
- }
- "timestamp-micros" => {
- return try_convert_to_logical_type(
- "timestamp-micros",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Long],
- |_| -> AvroResult<Schema> {
Ok(Schema::TimestampMicros) },
- );
- }
- "timestamp-nanos" => {
- return try_convert_to_logical_type(
- "timestamp-nanos",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Long],
- |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos)
},
- );
- }
- "local-timestamp-millis" => {
- return try_convert_to_logical_type(
- "local-timestamp-millis",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Long],
- |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampMillis) },
- );
- }
- "local-timestamp-micros" => {
- return try_convert_to_logical_type(
- "local-timestamp-micros",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Long],
- |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampMicros) },
- );
- }
- "local-timestamp-nanos" => {
- return try_convert_to_logical_type(
- "local-timestamp-nanos",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Long],
- |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampNanos) },
- );
- }
- "duration" => {
- return try_convert_to_logical_type(
- "duration",
- parse_as_native_complex(complex, self,
enclosing_namespace)?,
- &[SchemaKind::Fixed],
- |schema| -> AvroResult<Schema> {
- match schema {
- Schema::Fixed(fixed @ FixedSchema { size: 12,
.. }) => {
- Ok(Schema::Duration(fixed))
- }
- Schema::Fixed(FixedSchema { size, .. }) => {
- warn!(
- "Ignoring duration logical type on
fixed type because size ({size}) is not 12! Schema: {schema:?}"
- );
- Ok(schema)
- }
- _ => {
- warn!(
- "Ignoring invalid duration logical
type for schema: {schema:?}"
- );
- Ok(schema)
- }
- }
- },
- );
- }
- // In this case, of an unknown logical type, we just pass
through the underlying
- // type.
- _ => {}
- },
- // The spec says to ignore invalid logical types and just pass
through the
- // underlying type. It is unclear whether that applies to this
case or not, where the
- // `logicalType` is not a string.
- Some(value) => return
Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
- _ => {}
- }
- match complex.get("type") {
- Some(Value::String(t)) => match t.as_str() {
- "record" => match parse_location {
- RecordSchemaParseLocation::Root => {
- self.parse_record(complex, enclosing_namespace)
- }
- RecordSchemaParseLocation::FromField => {
- self.fetch_schema_ref(t, enclosing_namespace)
- }
- },
- "enum" => self.parse_enum(complex, enclosing_namespace),
- "array" => self.parse_array(complex, enclosing_namespace),
- "map" => self.parse_map(complex, enclosing_namespace),
- "fixed" => self.parse_fixed(complex, enclosing_namespace),
- other => self.parse_known_schema(other, enclosing_namespace),
- },
- Some(Value::Object(data)) => {
- self.parse_complex(data, enclosing_namespace,
RecordSchemaParseLocation::Root)
- }
- Some(Value::Array(variants)) => self.parse_union(variants,
enclosing_namespace),
- Some(unknown) =>
Err(Details::GetComplexType(unknown.clone()).into()),
- None => Err(Details::GetComplexTypeField.into()),
- }
- }
-
- fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
- let resolving_schema = Schema::Ref { name: name.clone() };
- self.resolving_schemas
- .insert(name.clone(), resolving_schema.clone());
-
- let namespace = &name.namespace;
-
- if let Some(aliases) = aliases {
- aliases.iter().for_each(|alias| {
- let alias_fullname = alias.fully_qualified_name(namespace);
- self.resolving_schemas
- .insert(alias_fullname, resolving_schema.clone());
- });
- }
- }
-
- fn register_parsed_schema(
- &mut self,
- fully_qualified_name: &Name,
- schema: &Schema,
- aliases: &Aliases,
- ) {
- // FIXME, this should be globally aware, so if there is something
overwriting something
- // else then there is an ambiguous schema definition. An appropriate
error should be thrown
- self.parsed_schemas
- .insert(fully_qualified_name.clone(), schema.clone());
- self.resolving_schemas.remove(fully_qualified_name);
-
- let namespace = &fully_qualified_name.namespace;
-
- if let Some(aliases) = aliases {
- aliases.iter().for_each(|alias| {
- let alias_fullname = alias.fully_qualified_name(namespace);
- self.resolving_schemas.remove(&alias_fullname);
- self.parsed_schemas.insert(alias_fullname, schema.clone());
- });
- }
- }
-
- /// Returns already parsed schema or a schema that is currently being
resolved.
- fn get_already_seen_schema(
- &self,
- complex: &Map<String, Value>,
- enclosing_namespace: &Namespace,
- ) -> Option<&Schema> {
- match complex.get("type") {
- Some(Value::String(typ)) => {
- let name = Name::new(typ.as_str())
- .unwrap()
- .fully_qualified_name(enclosing_namespace);
- self.resolving_schemas
- .get(&name)
- .or_else(|| self.parsed_schemas.get(&name))
- }
- _ => None,
- }
- }
-
- /// Parse a `serde_json::Value` representing a Avro record type into a
- /// `Schema`.
- fn parse_record(
- &mut self,
- complex: &Map<String, Value>,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- let fields_opt = complex.get("fields");
-
- if fields_opt.is_none()
- && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
- {
- return Ok(seen.clone());
- }
-
- let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
- let aliases = fix_aliases_namespace(complex.aliases(),
&fully_qualified_name.namespace);
-
- let mut lookup = BTreeMap::new();
-
- self.register_resolving_schema(&fully_qualified_name, &aliases);
-
- debug!("Going to parse record schema: {:?}", &fully_qualified_name);
-
- let fields: Vec<RecordField> = fields_opt
- .and_then(|fields| fields.as_array())
- .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
- .and_then(|fields| {
- fields
- .iter()
- .filter_map(|field| field.as_object())
- .enumerate()
- .map(|(position, field)| {
- RecordField::parse(field, position, self,
&fully_qualified_name)
- })
- .collect::<Result<_, _>>()
- })?;
-
- for field in &fields {
- if let Some(_old) = lookup.insert(field.name.clone(),
field.position) {
- return
Err(Details::FieldNameDuplicate(field.name.clone()).into());
- }
-
- if let Some(ref field_aliases) = field.aliases {
- for alias in field_aliases {
- lookup.insert(alias.clone(), field.position);
- }
- }
- }
-
- let schema = Schema::Record(RecordSchema {
- name: fully_qualified_name.clone(),
- aliases: aliases.clone(),
- doc: complex.doc(),
- fields,
- lookup,
- attributes: self.get_custom_attributes(complex, vec!["fields"]),
- });
-
- self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
- Ok(schema)
- }
-
- fn get_custom_attributes(
- &self,
- complex: &Map<String, Value>,
- excluded: Vec<&'static str>,
- ) -> BTreeMap<String, Value> {
- let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
- for (key, value) in complex {
- match key.as_str() {
- "type" | "name" | "namespace" | "doc" | "aliases" |
"logicalType" => continue,
- candidate if excluded.contains(&candidate) => continue,
- _ => custom_attributes.insert(key.clone(), value.clone()),
- };
- }
- custom_attributes
- }
-
- /// Parse a `serde_json::Value` representing a Avro enum type into a
- /// `Schema`.
- fn parse_enum(
- &mut self,
- complex: &Map<String, Value>,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- let symbols_opt = complex.get("symbols");
-
- if symbols_opt.is_none()
- && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
- {
- return Ok(seen.clone());
- }
-
- let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
- let aliases = fix_aliases_namespace(complex.aliases(),
&fully_qualified_name.namespace);
-
- let symbols: Vec<String> = symbols_opt
- .and_then(|v| v.as_array())
- .ok_or_else(|| Error::from(Details::GetEnumSymbolsField))
- .and_then(|symbols| {
- symbols
- .iter()
- .map(|symbol| symbol.as_str().map(|s| s.to_string()))
- .collect::<Option<_>>()
- .ok_or_else(|| Error::from(Details::GetEnumSymbols))
- })?;
-
- let mut existing_symbols: HashSet<&String> =
HashSet::with_capacity(symbols.len());
- for symbol in symbols.iter() {
- validate_enum_symbol_name(symbol)?;
-
- // Ensure there are no duplicate symbols
- if existing_symbols.contains(&symbol) {
- return
Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
- }
-
- existing_symbols.insert(symbol);
- }
-
- let mut default: Option<String> = None;
- if let Some(value) = complex.get("default") {
- if let Value::String(ref s) = *value {
- default = Some(s.clone());
- } else {
- return
Err(Details::EnumDefaultWrongType(value.clone()).into());
- }
- }
-
- if let Some(ref value) = default {
- let resolved = types::Value::from(value.clone())
- .resolve_enum(&symbols, &Some(value.to_string()), &None)
- .is_ok();
- if !resolved {
- return Err(Details::GetEnumDefault {
- symbol: value.to_string(),
- symbols,
- }
- .into());
- }
- }
-
- let schema = Schema::Enum(EnumSchema {
- name: fully_qualified_name.clone(),
- aliases: aliases.clone(),
- doc: complex.doc(),
- symbols,
- default,
- attributes: self.get_custom_attributes(complex, vec!["symbols"]),
- });
-
- self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
-
- Ok(schema)
- }
-
- /// Parse a `serde_json::Value` representing a Avro array type into a
- /// `Schema`.
- fn parse_array(
- &mut self,
- complex: &Map<String, Value>,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- complex
- .get("items")
- .ok_or_else(|| Details::GetArrayItemsField.into())
- .and_then(|items| self.parse(items, enclosing_namespace))
- .map(|items| {
- Schema::array_with_attributes(
- items,
- self.get_custom_attributes(complex, vec!["items"]),
- )
- })
- }
-
- /// Parse a `serde_json::Value` representing a Avro map type into a
- /// `Schema`.
- fn parse_map(
- &mut self,
- complex: &Map<String, Value>,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- complex
- .get("values")
- .ok_or_else(|| Details::GetMapValuesField.into())
- .and_then(|items| self.parse(items, enclosing_namespace))
- .map(|items| {
- Schema::map_with_attributes(
- items,
- self.get_custom_attributes(complex, vec!["values"]),
- )
- })
- }
-
- /// Parse a `serde_json::Value` representing a Avro union type into a
- /// `Schema`.
- fn parse_union(
- &mut self,
- items: &[Value],
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- items
- .iter()
- .map(|v| self.parse(v, enclosing_namespace))
- .collect::<Result<Vec<_>, _>>()
- .and_then(|schemas| {
- if schemas.is_empty() {
- error!(
- "Union schemas should have at least two members! \
- Please enable debug logging to find out which Record
schema \
- declares the union with
'RUST_LOG=apache_avro::schema=debug'."
- );
- } else if schemas.len() == 1 {
- warn!(
- "Union schema with just one member! Consider dropping
the union! \
- Please enable debug logging to find out which Record
schema \
- declares the union with
'RUST_LOG=apache_avro::schema=debug'."
- );
- }
- Ok(Schema::Union(UnionSchema::new(schemas)?))
- })
- }
-
- /// Parse a `serde_json::Value` representing a Avro fixed type into a
- /// `Schema`.
- fn parse_fixed(
- &mut self,
- complex: &Map<String, Value>,
- enclosing_namespace: &Namespace,
- ) -> AvroResult<Schema> {
- let size_opt = complex.get("size");
- if size_opt.is_none()
- && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
- {
- return Ok(seen.clone());
- }
-
- let doc = complex.get("doc").and_then(|v| match &v {
- &Value::String(docstr) => Some(docstr.clone()),
- _ => None,
- });
-
- let size = match size_opt {
- Some(size) => size
- .as_u64()
- .ok_or_else(||
Details::GetFixedSizeFieldPositive(size.clone())),
- None => Err(Details::GetFixedSizeField),
- }?;
-
- let default = complex.get("default").and_then(|v| match &v {
- &Value::String(default) => Some(default.clone()),
- _ => None,
- });
-
- if default.is_some() {
- let len = default.clone().unwrap().len();
- if len != size as usize {
- return Err(Details::FixedDefaultLenSizeMismatch(len,
size).into());
- }
- }
-
- let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
- let aliases = fix_aliases_namespace(complex.aliases(),
&fully_qualified_name.namespace);
-
- let schema = Schema::Fixed(FixedSchema {
- name: fully_qualified_name.clone(),
- aliases: aliases.clone(),
- doc,
- size: size as usize,
- default,
- attributes: self.get_custom_attributes(complex, vec!["size"]),
- });
-
- self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
-
- Ok(schema)
- }
-}
-
-// A type alias may be specified either as a fully namespace-qualified, or
relative
-// to the namespace of the name it is an alias for. For example, if a type
named "a.b"
-// has aliases of "c" and "x.y", then the fully qualified names of its aliases
are "a.c"
-// and "x.y".
-// https://avro.apache.org/docs/current/specification/#aliases
-fn fix_aliases_namespace(aliases: Option<Vec<String>>, namespace: &Namespace)
-> Aliases {
- aliases.map(|aliases| {
- aliases
- .iter()
- .map(|alias| {
- if alias.find('.').is_none() {
- match namespace {
- Some(ns) => format!("{ns}.{alias}"),
- None => alias.clone(),
- }
- } else {
- alias.clone()
- }
- })
- .map(|alias| Alias::new(alias.as_str()).unwrap())
- .collect()
- })
-}
-
-fn get_schema_type_name(name: Name, value: Value) -> Name {
- match value.get("type") {
- Some(Value::Object(complex_type)) => match complex_type.name() {
- Some(name) => Name::new(name.as_str()).unwrap(),
- _ => name,
- },
- _ => name,
- }
-}
-
impl Serialize for Schema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
diff --git a/avro/src/schema/parser.rs b/avro/src/schema/parser.rs
new file mode 100644
index 0000000..ced79bc
--- /dev/null
+++ b/avro/src/schema/parser.rs
@@ -0,0 +1,881 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::error::Details;
+use crate::schema::record::RecordSchemaParseLocation;
+use crate::schema::{
+ Alias, Aliases, DecimalMetadata, DecimalSchema, EnumSchema, FixedSchema,
Name, Names,
+ Namespace, Precision, RecordField, RecordSchema, Scale, Schema,
SchemaKind, UnionSchema,
+ UuidSchema,
+};
+use crate::types;
+use crate::util::MapHelper;
+use crate::validator::validate_enum_symbol_name;
+use crate::{AvroResult, Error};
+use log::{debug, error, warn};
+use serde_json::{Map, Value};
+use std::collections::{BTreeMap, HashMap, HashSet};
+
+#[derive(Default)]
+pub(crate) struct Parser {
+ input_schemas: HashMap<Name, Value>,
+ /// A map of name -> Schema::Ref
+ /// Used to resolve cyclic references, i.e. when a
+ /// field's type is a reference to its record's type
+ resolving_schemas: Names,
+ input_order: Vec<Name>,
+ /// A map of name -> fully parsed Schema
+ /// Used to avoid parsing the same schema twice
+ parsed_schemas: Names,
+}
+
+impl Parser {
+ pub(crate) fn new(
+ input_schemas: HashMap<Name, Value>,
+ input_order: Vec<Name>,
+ parsed_schemas: Names,
+ ) -> Self {
+ Self {
+ input_schemas,
+ resolving_schemas: HashMap::default(),
+ input_order,
+ parsed_schemas,
+ }
+ }
+
+ pub(crate) fn get_parsed_schemas(&mut self) -> &Names {
+ &self.parsed_schemas
+ }
+
+ /// Create a `Schema` from a string representing a JSON Avro schema.
+ pub(super) fn parse_str(&mut self, input: &str) -> AvroResult<Schema> {
+ let value =
serde_json::from_str(input).map_err(Details::ParseSchemaJson)?;
+ self.parse(&value, &None)
+ }
+
+ /// Create an array of `Schema`'s from an iterator of JSON Avro schemas.
It is allowed that
+ /// the schemas have cross-dependencies; these will be resolved during
parsing.
+ pub(super) fn parse_list(&mut self) -> AvroResult<Vec<Schema>> {
+ self.parse_input_schemas()?;
+
+ let mut parsed_schemas = Vec::with_capacity(self.parsed_schemas.len());
+ for name in self.input_order.drain(0..) {
+ let parsed = self
+ .parsed_schemas
+ .remove(&name)
+ .expect("One of the input schemas was unexpectedly not
parsed");
+ parsed_schemas.push(parsed);
+ }
+ Ok(parsed_schemas)
+ }
+
+ /// Convert the input schemas to parsed_schemas
+ pub(super) fn parse_input_schemas(&mut self) -> Result<(), Error> {
+ while !self.input_schemas.is_empty() {
+ let next_name = self
+ .input_schemas
+ .keys()
+ .next()
+ .expect("Input schemas unexpectedly empty")
+ .to_owned();
+ let (name, value) = self
+ .input_schemas
+ .remove_entry(&next_name)
+ .expect("Key unexpectedly missing");
+ let parsed = self.parse(&value, &None)?;
+ self.parsed_schemas
+ .insert(self.get_schema_type_name(name, value), parsed);
+ }
+ Ok(())
+ }
+
+ /// Create a `Schema` from a `serde_json::Value` representing a JSON Avro
+ /// schema.
+ pub(super) fn parse(
+ &mut self,
+ value: &Value,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ match *value {
+ Value::String(ref t) => self.parse_known_schema(t.as_str(),
enclosing_namespace),
+ Value::Object(ref data) => {
+ self.parse_complex(data, enclosing_namespace,
RecordSchemaParseLocation::Root)
+ }
+ Value::Array(ref data) => self.parse_union(data,
enclosing_namespace),
+ _ => Err(Details::ParseSchemaFromValidJson.into()),
+ }
+ }
+
+ /// Parse a `serde_json::Value` representing an Avro type whose Schema is
known into a
+ /// `Schema`. A Schema for a `serde_json::Value` is known if it is
primitive or has
+ /// been parsed previously by the parsed and stored in its map of
parsed_schemas.
+ fn parse_known_schema(
+ &mut self,
+ name: &str,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ match name {
+ "null" => Ok(Schema::Null),
+ "boolean" => Ok(Schema::Boolean),
+ "int" => Ok(Schema::Int),
+ "long" => Ok(Schema::Long),
+ "double" => Ok(Schema::Double),
+ "float" => Ok(Schema::Float),
+ "bytes" => Ok(Schema::Bytes),
+ "string" => Ok(Schema::String),
+ _ => self.fetch_schema_ref(name, enclosing_namespace),
+ }
+ }
+
+ /// Given a name, tries to retrieve the parsed schema from
`parsed_schemas`.
+ /// If a parsed schema is not found, it checks if a currently resolving
+ /// schema with that name exists.
+ /// If a resolving schema is not found, it checks if a json with that name
exists
+ /// in `input_schemas` and then parses it (removing it from
`input_schemas`)
+ /// and adds the parsed schema to `parsed_schemas`.
+ ///
+ /// This method allows schemas definitions that depend on other types to
+ /// parse their dependencies (or look them up if already parsed).
+ pub(super) fn fetch_schema_ref(
+ &mut self,
+ name: &str,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ fn get_schema_ref(parsed: &Schema) -> Schema {
+ match parsed {
+ &Schema::Record(RecordSchema { ref name, .. })
+ | &Schema::Enum(EnumSchema { ref name, .. })
+ | &Schema::Fixed(FixedSchema { ref name, .. }) => {
+ Schema::Ref { name: name.clone() }
+ }
+ _ => parsed.clone(),
+ }
+ }
+
+ let name = Name::new(name)?;
+ let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
+
+ if self.parsed_schemas.contains_key(&fully_qualified_name) {
+ return Ok(Schema::Ref {
+ name: fully_qualified_name,
+ });
+ }
+ if let Some(resolving_schema) =
self.resolving_schemas.get(&fully_qualified_name) {
+ return Ok(resolving_schema.clone());
+ }
+
+ // For good error reporting we add this check
+ match name.name.as_str() {
+ "record" | "enum" | "fixed" => {
+ return
Err(Details::InvalidSchemaRecord(name.to_string()).into());
+ }
+ _ => (),
+ }
+
+ let value = self
+ .input_schemas
+ .remove(&fully_qualified_name)
+ // TODO make a better descriptive error message here that conveys
that a named schema cannot be found
+ .ok_or_else(||
Details::ParsePrimitive(fully_qualified_name.fullname(None)))?;
+
+ // parsing a full schema from inside another schema. Other full schema
will not inherit namespace
+ let parsed = self.parse(&value, &None)?;
+ self.parsed_schemas
+ .insert(self.get_schema_type_name(name, value), parsed.clone());
+
+ Ok(get_schema_ref(&parsed))
+ }
+
+ fn get_decimal_integer(
+ &self,
+ complex: &Map<String, Value>,
+ key: &'static str,
+ ) -> AvroResult<DecimalMetadata> {
+ match complex.get(key) {
+ Some(Value::Number(value)) =>
self.parse_json_integer_for_decimal(value),
+ None => {
+ if key == "scale" {
+ Ok(0)
+ } else {
+ Err(Details::GetDecimalMetadataFromJson(key).into())
+ }
+ }
+ Some(value) => Err(Details::GetDecimalMetadataValueFromJson {
+ key: key.into(),
+ value: value.clone(),
+ }
+ .into()),
+ }
+ }
+
+ fn parse_precision_and_scale(
+ &self,
+ complex: &Map<String, Value>,
+ ) -> AvroResult<(Precision, Scale)> {
+ let precision = self.get_decimal_integer(complex, "precision")?;
+ let scale = self.get_decimal_integer(complex, "scale")?;
+
+ if precision < 1 {
+ return Err(Details::DecimalPrecisionMuBePositive { precision
}.into());
+ }
+
+ if precision < scale {
+ Err(Details::DecimalPrecisionLessThanScale { precision, scale
}.into())
+ } else {
+ Ok((precision, scale))
+ }
+ }
+
+ /// Parse a `serde_json::Value` representing a complex Avro type into a
+ /// `Schema`.
+ ///
+ /// Avro supports "recursive" definition of types.
+ /// e.g: {"type": {"type": "string"}}
+ pub(super) fn parse_complex(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ parse_location: RecordSchemaParseLocation,
+ ) -> AvroResult<Schema> {
+ // Try to parse this as a native complex type.
+ fn parse_as_native_complex(
+ complex: &Map<String, Value>,
+ parser: &mut Parser,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ match complex.get("type") {
+ Some(value) => match value {
+ Value::String(s) if s == "fixed" => {
+ parser.parse_fixed(complex, enclosing_namespace)
+ }
+ _ => parser.parse(value, enclosing_namespace),
+ },
+ None => Err(Details::GetLogicalTypeField.into()),
+ }
+ }
+
+ // This crate support some logical types natively, and this function
tries to convert
+ // a native complex type with a logical type attribute to these
logical types.
+ // This function:
+ // 1. Checks whether the native complex type is in the supported kinds.
+ // 2. If it is, using the convert function to convert the native
complex type to
+ // a logical type.
+ fn try_convert_to_logical_type<F>(
+ logical_type: &str,
+ schema: Schema,
+ supported_schema_kinds: &[SchemaKind],
+ convert: F,
+ ) -> AvroResult<Schema>
+ where
+ F: Fn(Schema) -> AvroResult<Schema>,
+ {
+ let kind = SchemaKind::from(schema.clone());
+ if supported_schema_kinds.contains(&kind) {
+ convert(schema)
+ } else {
+ warn!(
+ "Ignoring unknown logical type '{logical_type}' for schema
of type: {schema:?}!"
+ );
+ Ok(schema)
+ }
+ }
+
+ match complex.get("logicalType") {
+ Some(Value::String(t)) => match t.as_str() {
+ "decimal" => {
+ return try_convert_to_logical_type(
+ "decimal",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Fixed, SchemaKind::Bytes],
+ |inner| -> AvroResult<Schema> {
+ match self.parse_precision_and_scale(complex) {
+ Ok((precision, scale)) =>
Ok(Schema::Decimal(DecimalSchema {
+ precision,
+ scale,
+ inner: inner.try_into()?,
+ })),
+ Err(err) => {
+ warn!("Ignoring invalid decimal logical
type: {err}");
+ Ok(inner)
+ }
+ }
+ },
+ );
+ }
+ "big-decimal" => {
+ return try_convert_to_logical_type(
+ "big-decimal",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Bytes],
+ |_| -> AvroResult<Schema> { Ok(Schema::BigDecimal) },
+ );
+ }
+ "uuid" => {
+ return try_convert_to_logical_type(
+ "uuid",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::String, SchemaKind::Fixed,
SchemaKind::Bytes],
+ |schema| match schema {
+ Schema::String =>
Ok(Schema::Uuid(UuidSchema::String)),
+ Schema::Fixed(fixed @ FixedSchema { size: 16, ..
}) => {
+ Ok(Schema::Uuid(UuidSchema::Fixed(fixed)))
+ }
+ Schema::Fixed(FixedSchema { size, .. }) => {
+ warn!(
+ "Ignoring uuid logical type for a Fixed
schema because its size ({size:?}) is not 16! Schema: {schema:?}"
+ );
+ Ok(schema)
+ }
+ Schema::Bytes =>
Ok(Schema::Uuid(UuidSchema::Bytes)),
+ _ => {
+ warn!("Ignoring invalid uuid logical type for
schema: {schema:?}");
+ Ok(schema)
+ }
+ },
+ );
+ }
+ "date" => {
+ return try_convert_to_logical_type(
+ "date",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Int],
+ |_| -> AvroResult<Schema> { Ok(Schema::Date) },
+ );
+ }
+ "time-millis" => {
+ return try_convert_to_logical_type(
+ "date",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Int],
+ |_| -> AvroResult<Schema> { Ok(Schema::TimeMillis) },
+ );
+ }
+ "time-micros" => {
+ return try_convert_to_logical_type(
+ "time-micros",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Long],
+ |_| -> AvroResult<Schema> { Ok(Schema::TimeMicros) },
+ );
+ }
+ "timestamp-millis" => {
+ return try_convert_to_logical_type(
+ "timestamp-millis",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Long],
+ |_| -> AvroResult<Schema> {
Ok(Schema::TimestampMillis) },
+ );
+ }
+ "timestamp-micros" => {
+ return try_convert_to_logical_type(
+ "timestamp-micros",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Long],
+ |_| -> AvroResult<Schema> {
Ok(Schema::TimestampMicros) },
+ );
+ }
+ "timestamp-nanos" => {
+ return try_convert_to_logical_type(
+ "timestamp-nanos",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Long],
+ |_| -> AvroResult<Schema> { Ok(Schema::TimestampNanos)
},
+ );
+ }
+ "local-timestamp-millis" => {
+ return try_convert_to_logical_type(
+ "local-timestamp-millis",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Long],
+ |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampMillis) },
+ );
+ }
+ "local-timestamp-micros" => {
+ return try_convert_to_logical_type(
+ "local-timestamp-micros",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Long],
+ |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampMicros) },
+ );
+ }
+ "local-timestamp-nanos" => {
+ return try_convert_to_logical_type(
+ "local-timestamp-nanos",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Long],
+ |_| -> AvroResult<Schema> {
Ok(Schema::LocalTimestampNanos) },
+ );
+ }
+ "duration" => {
+ return try_convert_to_logical_type(
+ "duration",
+ parse_as_native_complex(complex, self,
enclosing_namespace)?,
+ &[SchemaKind::Fixed],
+ |schema| -> AvroResult<Schema> {
+ match schema {
+ Schema::Fixed(fixed @ FixedSchema { size: 12,
.. }) => {
+ Ok(Schema::Duration(fixed))
+ }
+ Schema::Fixed(FixedSchema { size, .. }) => {
+ warn!(
+ "Ignoring duration logical type on
fixed type because size ({size}) is not 12! Schema: {schema:?}"
+ );
+ Ok(schema)
+ }
+ _ => {
+ warn!(
+ "Ignoring invalid duration logical
type for schema: {schema:?}"
+ );
+ Ok(schema)
+ }
+ }
+ },
+ );
+ }
+ // In this case, of an unknown logical type, we just pass
through the underlying
+ // type.
+ _ => {}
+ },
+ // The spec says to ignore invalid logical types and just pass
through the
+ // underlying type. It is unclear whether that applies to this
case or not, where the
+ // `logicalType` is not a string.
+ Some(value) => return
Err(Details::GetLogicalTypeFieldType(value.clone()).into()),
+ _ => {}
+ }
+ match complex.get("type") {
+ Some(Value::String(t)) => match t.as_str() {
+ "record" => match parse_location {
+ RecordSchemaParseLocation::Root => {
+ self.parse_record(complex, enclosing_namespace)
+ }
+ RecordSchemaParseLocation::FromField => {
+ self.fetch_schema_ref(t, enclosing_namespace)
+ }
+ },
+ "enum" => self.parse_enum(complex, enclosing_namespace),
+ "array" => self.parse_array(complex, enclosing_namespace),
+ "map" => self.parse_map(complex, enclosing_namespace),
+ "fixed" => self.parse_fixed(complex, enclosing_namespace),
+ other => self.parse_known_schema(other, enclosing_namespace),
+ },
+ Some(Value::Object(data)) => {
+ self.parse_complex(data, enclosing_namespace,
RecordSchemaParseLocation::Root)
+ }
+ Some(Value::Array(variants)) => self.parse_union(variants,
enclosing_namespace),
+ Some(unknown) =>
Err(Details::GetComplexType(unknown.clone()).into()),
+ None => Err(Details::GetComplexTypeField.into()),
+ }
+ }
+
+ fn register_resolving_schema(&mut self, name: &Name, aliases: &Aliases) {
+ let resolving_schema = Schema::Ref { name: name.clone() };
+ self.resolving_schemas
+ .insert(name.clone(), resolving_schema.clone());
+
+ let namespace = &name.namespace;
+
+ if let Some(aliases) = aliases {
+ aliases.iter().for_each(|alias| {
+ let alias_fullname = alias.fully_qualified_name(namespace);
+ self.resolving_schemas
+ .insert(alias_fullname, resolving_schema.clone());
+ });
+ }
+ }
+
+ fn register_parsed_schema(
+ &mut self,
+ fully_qualified_name: &Name,
+ schema: &Schema,
+ aliases: &Aliases,
+ ) {
+ // FIXME, this should be globally aware, so if there is something
overwriting something
+ // else then there is an ambiguous schema definition. An appropriate
error should be thrown
+ self.parsed_schemas
+ .insert(fully_qualified_name.clone(), schema.clone());
+ self.resolving_schemas.remove(fully_qualified_name);
+
+ let namespace = &fully_qualified_name.namespace;
+
+ if let Some(aliases) = aliases {
+ aliases.iter().for_each(|alias| {
+ let alias_fullname = alias.fully_qualified_name(namespace);
+ self.resolving_schemas.remove(&alias_fullname);
+ self.parsed_schemas.insert(alias_fullname, schema.clone());
+ });
+ }
+ }
+
+ /// Returns already parsed schema or a schema that is currently being
resolved.
+ fn get_already_seen_schema(
+ &self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> Option<&Schema> {
+ match complex.get("type") {
+ Some(Value::String(typ)) => {
+ let name = Name::new(typ.as_str())
+ .unwrap()
+ .fully_qualified_name(enclosing_namespace);
+ self.resolving_schemas
+ .get(&name)
+ .or_else(|| self.parsed_schemas.get(&name))
+ }
+ _ => None,
+ }
+ }
+
+ /// Parse a `serde_json::Value` representing a Avro record type into a
+ /// `Schema`.
+ fn parse_record(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ let fields_opt = complex.get("fields");
+
+ if fields_opt.is_none()
+ && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
+ {
+ return Ok(seen.clone());
+ }
+
+ let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
+ let aliases =
+ self.fix_aliases_namespace(complex.aliases(),
&fully_qualified_name.namespace);
+
+ let mut lookup = BTreeMap::new();
+
+ self.register_resolving_schema(&fully_qualified_name, &aliases);
+
+ debug!("Going to parse record schema: {:?}", &fully_qualified_name);
+
+ let fields: Vec<RecordField> = fields_opt
+ .and_then(|fields| fields.as_array())
+ .ok_or_else(|| Error::new(Details::GetRecordFieldsJson))
+ .and_then(|fields| {
+ fields
+ .iter()
+ .filter_map(|field| field.as_object())
+ .enumerate()
+ .map(|(position, field)| {
+ RecordField::parse(field, position, self,
&fully_qualified_name)
+ })
+ .collect::<Result<_, _>>()
+ })?;
+
+ for field in &fields {
+ if let Some(_old) = lookup.insert(field.name.clone(),
field.position) {
+ return
Err(Details::FieldNameDuplicate(field.name.clone()).into());
+ }
+
+ if let Some(ref field_aliases) = field.aliases {
+ for alias in field_aliases {
+ lookup.insert(alias.clone(), field.position);
+ }
+ }
+ }
+
+ let schema = Schema::Record(RecordSchema {
+ name: fully_qualified_name.clone(),
+ aliases: aliases.clone(),
+ doc: complex.doc(),
+ fields,
+ lookup,
+ attributes: self.get_custom_attributes(complex, vec!["fields"]),
+ });
+
+ self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
+ Ok(schema)
+ }
+
+ fn get_custom_attributes(
+ &self,
+ complex: &Map<String, Value>,
+ excluded: Vec<&'static str>,
+ ) -> BTreeMap<String, Value> {
+ let mut custom_attributes: BTreeMap<String, Value> = BTreeMap::new();
+ for (key, value) in complex {
+ match key.as_str() {
+ "type" | "name" | "namespace" | "doc" | "aliases" |
"logicalType" => continue,
+ candidate if excluded.contains(&candidate) => continue,
+ _ => custom_attributes.insert(key.clone(), value.clone()),
+ };
+ }
+ custom_attributes
+ }
+
+ /// Parse a `serde_json::Value` representing a Avro enum type into a
+ /// `Schema`.
+ fn parse_enum(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ let symbols_opt = complex.get("symbols");
+
+ if symbols_opt.is_none()
+ && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
+ {
+ return Ok(seen.clone());
+ }
+
+ let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
+ let aliases =
+ self.fix_aliases_namespace(complex.aliases(),
&fully_qualified_name.namespace);
+
+ let symbols: Vec<String> = symbols_opt
+ .and_then(|v| v.as_array())
+ .ok_or_else(|| Error::from(Details::GetEnumSymbolsField))
+ .and_then(|symbols| {
+ symbols
+ .iter()
+ .map(|symbol| symbol.as_str().map(|s| s.to_string()))
+ .collect::<Option<_>>()
+ .ok_or_else(|| Error::from(Details::GetEnumSymbols))
+ })?;
+
+ let mut existing_symbols: HashSet<&String> =
HashSet::with_capacity(symbols.len());
+ for symbol in symbols.iter() {
+ validate_enum_symbol_name(symbol)?;
+
+ // Ensure there are no duplicate symbols
+ if existing_symbols.contains(&symbol) {
+ return
Err(Details::EnumSymbolDuplicate(symbol.to_string()).into());
+ }
+
+ existing_symbols.insert(symbol);
+ }
+
+ let mut default: Option<String> = None;
+ if let Some(value) = complex.get("default") {
+ if let Value::String(ref s) = *value {
+ default = Some(s.clone());
+ } else {
+ return
Err(Details::EnumDefaultWrongType(value.clone()).into());
+ }
+ }
+
+ if let Some(ref value) = default {
+ let resolved = types::Value::from(value.clone())
+ .resolve_enum(&symbols, &Some(value.to_string()), &None)
+ .is_ok();
+ if !resolved {
+ return Err(Details::GetEnumDefault {
+ symbol: value.to_string(),
+ symbols,
+ }
+ .into());
+ }
+ }
+
+ let schema = Schema::Enum(EnumSchema {
+ name: fully_qualified_name.clone(),
+ aliases: aliases.clone(),
+ doc: complex.doc(),
+ symbols,
+ default,
+ attributes: self.get_custom_attributes(complex, vec!["symbols"]),
+ });
+
+ self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
+
+ Ok(schema)
+ }
+
+ /// Parse a `serde_json::Value` representing a Avro array type into a
+ /// `Schema`.
+ fn parse_array(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ complex
+ .get("items")
+ .ok_or_else(|| Details::GetArrayItemsField.into())
+ .and_then(|items| self.parse(items, enclosing_namespace))
+ .map(|items| {
+ Schema::array_with_attributes(
+ items,
+ self.get_custom_attributes(complex, vec!["items"]),
+ )
+ })
+ }
+
+ /// Parse a `serde_json::Value` representing a Avro map type into a
+ /// `Schema`.
+ fn parse_map(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ complex
+ .get("values")
+ .ok_or_else(|| Details::GetMapValuesField.into())
+ .and_then(|items| self.parse(items, enclosing_namespace))
+ .map(|items| {
+ Schema::map_with_attributes(
+ items,
+ self.get_custom_attributes(complex, vec!["values"]),
+ )
+ })
+ }
+
+ /// Parse a `serde_json::Value` representing a Avro union type into a
+ /// `Schema`.
+ fn parse_union(
+ &mut self,
+ items: &[Value],
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ items
+ .iter()
+ .map(|v| self.parse(v, enclosing_namespace))
+ .collect::<Result<Vec<_>, _>>()
+ .and_then(|schemas| {
+ if schemas.is_empty() {
+ error!(
+ "Union schemas should have at least two members! \
+ Please enable debug logging to find out which Record
schema \
+ declares the union with
'RUST_LOG=apache_avro::schema=debug'."
+ );
+ } else if schemas.len() == 1 {
+ warn!(
+ "Union schema with just one member! Consider dropping
the union! \
+ Please enable debug logging to find out which Record
schema \
+ declares the union with
'RUST_LOG=apache_avro::schema=debug'."
+ );
+ }
+ Ok(Schema::Union(UnionSchema::new(schemas)?))
+ })
+ }
+
+ /// Parse a `serde_json::Value` representing a Avro fixed type into a
+ /// `Schema`.
+ fn parse_fixed(
+ &mut self,
+ complex: &Map<String, Value>,
+ enclosing_namespace: &Namespace,
+ ) -> AvroResult<Schema> {
+ let size_opt = complex.get("size");
+ if size_opt.is_none()
+ && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
+ {
+ return Ok(seen.clone());
+ }
+
+ let doc = complex.get("doc").and_then(|v| match &v {
+ &Value::String(docstr) => Some(docstr.clone()),
+ _ => None,
+ });
+
+ let size = match size_opt {
+ Some(size) => size
+ .as_u64()
+ .ok_or_else(||
Details::GetFixedSizeFieldPositive(size.clone())),
+ None => Err(Details::GetFixedSizeField),
+ }?;
+
+ let default = complex.get("default").and_then(|v| match &v {
+ &Value::String(default) => Some(default.clone()),
+ _ => None,
+ });
+
+ if default.is_some() {
+ let len = default.clone().unwrap().len();
+ if len != size as usize {
+ return Err(Details::FixedDefaultLenSizeMismatch(len,
size).into());
+ }
+ }
+
+ let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
+ let aliases =
+ self.fix_aliases_namespace(complex.aliases(),
&fully_qualified_name.namespace);
+
+ let schema = Schema::Fixed(FixedSchema {
+ name: fully_qualified_name.clone(),
+ aliases: aliases.clone(),
+ doc,
+ size: size as usize,
+ default,
+ attributes: self.get_custom_attributes(complex, vec!["size"]),
+ });
+
+ self.register_parsed_schema(&fully_qualified_name, &schema, &aliases);
+
+ Ok(schema)
+ }
+
+ // A type alias may be specified either as a fully namespace-qualified, or
relative
+ // to the namespace of the name it is an alias for. For example, if a type
named "a.b"
+ // has aliases of "c" and "x.y", then the fully qualified names of its
aliases are "a.c"
+ // and "x.y".
+ // https://avro.apache.org/docs/current/specification/#aliases
+ fn fix_aliases_namespace(
+ &self,
+ aliases: Option<Vec<String>>,
+ namespace: &Namespace,
+ ) -> Aliases {
+ aliases.map(|aliases| {
+ aliases
+ .iter()
+ .map(|alias| {
+ if alias.find('.').is_none() {
+ match namespace {
+ Some(ns) => format!("{ns}.{alias}"),
+ None => alias.clone(),
+ }
+ } else {
+ alias.clone()
+ }
+ })
+ .map(|alias| Alias::new(alias.as_str()).unwrap())
+ .collect()
+ })
+ }
+
+ fn get_schema_type_name(&self, name: Name, value: Value) -> Name {
+ match value.get("type") {
+ Some(Value::Object(complex_type)) => match complex_type.name() {
+ Some(name) => Name::new(name.as_str()).unwrap(),
+ _ => name,
+ },
+ _ => name,
+ }
+ }
+
+ fn parse_json_integer_for_decimal(
+ &self,
+ value: &serde_json::Number,
+ ) -> AvroResult<DecimalMetadata> {
+ Ok(if value.is_u64() {
+ let num = value
+ .as_u64()
+ .ok_or_else(|| Details::GetU64FromJson(value.clone()))?;
+ num.try_into()
+ .map_err(|e| Details::ConvertU64ToUsize(e, num))?
+ } else if value.is_i64() {
+ let num = value
+ .as_i64()
+ .ok_or_else(|| Details::GetI64FromJson(value.clone()))?;
+ num.try_into()
+ .map_err(|e| Details::ConvertI64ToUsize(e, num))?
+ } else {
+ return
Err(Details::GetPrecisionOrScaleFromJson(value.clone()).into());
+ })
+ }
+}
diff --git a/avro/src/schema/record/field.rs b/avro/src/schema/record/field.rs
index 365190c..9a42310 100644
--- a/avro/src/schema/record/field.rs
+++ b/avro/src/schema/record/field.rs
@@ -92,7 +92,7 @@ impl RecordField {
&schema,
&name,
&enclosing_record.fullname(None),
- &parser.parsed_schemas,
+ parser.get_parsed_schemas(),
&default,
)?;