This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new e8a760d feat!: Rework schema compatibility (#342)
e8a760d is described below
commit e8a760dc3e2bb4c897f519440f7163c923992303
Author: Kriskras99 <[email protected]>
AuthorDate: Thu Jan 8 04:06:46 2026 +0100
feat!: Rework schema compatibility (#342)
* feat!: Rework schema compatibility
This new implementation is (in my opinion) simpler than the previous
iteration and easier to modify when needed.
I've introduced the concept of partial and full compatibility. Partial
compatibility means that there incompatiblities but that depends on whats
written, for example an enum `[A, B, C]` is partially compatible with
the enum `[A, B]` but depends on what the writer has written.
Users wishing to be absolutely sure, can treat `Compatibility::Partial`
as an error, others can use it to slowly upgrade from an old schema.
* feat: Bump MSRV to 1.88
* test: Check if Decimal of different inner types can resolve
* fix: If both writer and reader schema are Decimal than precision and
scale must be checked
---------
Co-authored-by: default <[email protected]>
---
.github/workflows/test-lang-rust-ci.yml | 2 +-
.github/workflows/test-lang-rust-clippy.yml | 2 +-
Cargo.toml | 2 +-
avro/README.md | 2 +-
avro/src/duration.rs | 2 +-
avro/src/error.rs | 17 +-
avro/src/lib.rs | 2 +-
avro/src/schema.rs | 34 +-
avro/src/schema_compatibility.rs | 1089 +++++++++++----------------
avro/src/types.rs | 32 +-
avro/tests/shared.rs | 41 +-
11 files changed, 522 insertions(+), 703 deletions(-)
diff --git a/.github/workflows/test-lang-rust-ci.yml
b/.github/workflows/test-lang-rust-ci.yml
index 4665303..a0a10ac 100644
--- a/.github/workflows/test-lang-rust-ci.yml
+++ b/.github/workflows/test-lang-rust-ci.yml
@@ -52,7 +52,7 @@ jobs:
- "stable"
- "beta"
- "nightly"
- - "1.86.0" # MSRV
+ - "1.88.0" # MSRV
runner:
- name: ubuntu-24.04
target: x86_64-unknown-linux-gnu
diff --git a/.github/workflows/test-lang-rust-clippy.yml
b/.github/workflows/test-lang-rust-clippy.yml
index 7516229..dc8de55 100644
--- a/.github/workflows/test-lang-rust-clippy.yml
+++ b/.github/workflows/test-lang-rust-clippy.yml
@@ -40,7 +40,7 @@ jobs:
matrix:
rust:
- 'stable'
- - '1.86.0' # MSRV
+ - '1.88.0' # MSRV
steps:
- uses: actions/checkout@v6
- uses: dtolnay/rust-toolchain@stable
diff --git a/Cargo.toml b/Cargo.toml
index 3961b3d..0d8acc6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -33,7 +33,7 @@ version = "0.22.0"
license = "Apache-2.0"
repository = "https://github.com/apache/avro-rs"
edition = "2024"
-rust-version = "1.86.0"
+rust-version = "1.88.0"
keywords = ["avro", "data", "serialization"]
categories = ["encoding"]
documentation = "https://docs.rs/apache-avro"
diff --git a/avro/README.md b/avro/README.md
index d9d7922..fb0282c 100644
--- a/avro/README.md
+++ b/avro/README.md
@@ -115,7 +115,7 @@ versions. If you have troubles upgrading, check the release
notes.
## Minimum supported Rust version
-1.86.0
+1.88.0
## Defining a schema
diff --git a/avro/src/duration.rs b/avro/src/duration.rs
index ebaa469..c229171 100644
--- a/avro/src/duration.rs
+++ b/avro/src/duration.rs
@@ -230,7 +230,7 @@ mod tests {
assert_eq!(b, vec![7, 0, 0, 0, 4, 0, 0, 0, 45, 0, 0, 0]);
}
_ => {
- Err(anyhow!("Expected a Bytes value but got {:?}", ser_val))?;
+ Err(anyhow!("Expected a Bytes value but got {ser_val:?}"))?;
}
}
Ok(())
diff --git a/avro/src/error.rs b/avro/src/error.rs
index b2b0211..48eb91b 100644
--- a/avro/src/error.rs
+++ b/avro/src/error.rs
@@ -611,15 +611,28 @@ pub enum CompatibilityError {
#[error("Incompatible schemata! Field '{0}' in reader schema must have a
default value")]
MissingDefaultValue(String),
- #[error("Incompatible schemata! Reader's symbols must contain all writer's
symbols")]
+ #[error("Incompatible schemata! Reader's symbols contain none of the
writer's symbols")]
MissingSymbols,
#[error("Incompatible schemata! All elements in union must match for both
schemas")]
MissingUnionElements,
- #[error("Incompatible schemata! Name and size don't match for fixed")]
+ #[error("Incompatible schemata! At least one element in the union must
match the schema")]
+ SchemaMismatchAllUnionElements,
+
+ #[error("Incompatible schemata! Size doesn't match for fixed")]
FixedMismatch,
+ #[error(
+ "Incompatible schemata! Decimal precision and/or scale don't match,
reader: ({r_precision},{r_scale}), writer: ({w_precision},{w_scale})"
+ )]
+ DecimalMismatch {
+ r_precision: usize,
+ r_scale: usize,
+ w_precision: usize,
+ w_scale: usize,
+ },
+
#[error(
"Incompatible schemata! The name must be the same for both schemas.
Writer's name {writer_name} and reader's name {reader_name}"
)]
diff --git a/avro/src/lib.rs b/avro/src/lib.rs
index f75c5a3..5c744ff 100644
--- a/avro/src/lib.rs
+++ b/avro/src/lib.rs
@@ -104,7 +104,7 @@
//!
//! # Minimum supported Rust version
//!
-//! 1.86.0
+//! 1.88.0
//!
//! # Defining a schema
//!
diff --git a/avro/src/schema.rs b/avro/src/schema.rs
index 78dcebd..9d988c8 100644
--- a/avro/src/schema.rs
+++ b/avro/src/schema.rs
@@ -1897,10 +1897,10 @@ impl Parser {
) -> AvroResult<Schema> {
let fields_opt = complex.get("fields");
- if fields_opt.is_none() {
- if let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace) {
- return Ok(seen.clone());
- }
+ if fields_opt.is_none()
+ && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
+ {
+ return Ok(seen.clone());
}
let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
@@ -1976,10 +1976,10 @@ impl Parser {
) -> AvroResult<Schema> {
let symbols_opt = complex.get("symbols");
- if symbols_opt.is_none() {
- if let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace) {
- return Ok(seen.clone());
- }
+ if symbols_opt.is_none()
+ && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
+ {
+ return Ok(seen.clone());
}
let fully_qualified_name = Name::parse(complex, enclosing_namespace)?;
@@ -2119,10 +2119,10 @@ impl Parser {
enclosing_namespace: &Namespace,
) -> AvroResult<Schema> {
let size_opt = complex.get("size");
- if size_opt.is_none() {
- if let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace) {
- return Ok(seen.clone());
- }
+ if size_opt.is_none()
+ && let Some(seen) = self.get_already_seen_schema(complex,
enclosing_namespace)
+ {
+ return Ok(seen.clone());
}
let doc = complex.get("doc").and_then(|v| match &v {
@@ -2483,11 +2483,11 @@ fn pcf_map(schema: &Map<String, Value>, defined_names:
&mut HashSet<String>) ->
}
// Fully qualify the name, if it isn't already ([FULLNAMES] rule).
- if k == "name" {
- if let Some(ref n) = name {
- fields.push(("name", format!("{}:{}", pcf_string(k),
pcf_string(n))));
- continue;
- }
+ if k == "name"
+ && let Some(ref n) = name
+ {
+ fields.push(("name", format!("{}:{}", pcf_string(k),
pcf_string(n))));
+ continue;
}
// Strip off quotes surrounding "size" type, if they exist ([INTEGERS]
rule).
diff --git a/avro/src/schema_compatibility.rs b/avro/src/schema_compatibility.rs
index 560302b..74ae558 100644
--- a/avro/src/schema_compatibility.rs
+++ b/avro/src/schema_compatibility.rs
@@ -16,271 +16,362 @@
// under the License.
//! Logic for checking schema compatibility
-use crate::schema::UuidSchema;
use crate::{
error::CompatibilityError,
- schema::{EnumSchema, FixedSchema, RecordSchema, Schema, SchemaKind},
+ schema::{
+ ArraySchema, DecimalSchema, EnumSchema, InnerDecimalSchema, MapSchema,
RecordSchema,
+ Schema, UuidSchema,
+ },
};
use std::{
- collections::{HashSet, hash_map::DefaultHasher},
+ collections::{HashMap, hash_map::DefaultHasher},
hash::Hasher,
+ iter::once,
+ ops::BitAndAssign,
ptr,
};
-fn match_ref_schemas(
- writers_schema: &Schema,
- readers_schema: &Schema,
-) -> Result<(), CompatibilityError> {
- match (readers_schema, writers_schema) {
- (Schema::Ref { name: r_name }, Schema::Ref { name: w_name }) => {
- if r_name == w_name {
- Ok(())
- } else {
- Err(CompatibilityError::NameMismatch {
- writer_name: w_name.fullname(None),
- reader_name: r_name.fullname(None),
- })
- }
+pub struct SchemaCompatibility;
+
+/// How compatible are two schemas.
+#[derive(Debug, Copy, Clone, Eq, PartialEq)]
+pub enum Compatibility {
+ /// Full compatibility, resolving will always work.
+ Full,
+ /// Partial compatibility, resolving may error.
+ ///
+ /// This can happen if an enum doesn't have all fields, or unions don't
entirely overlap.
+ Partial,
+}
+
+impl BitAndAssign for Compatibility {
+ /// Combine two compatibilities.
+ ///
+ /// # Truth table
+ /// | | Full | Partial |
+ /// | ------- | ------- | ------- |
+ /// | Full | Full | Partial |
+ /// | Partial | Partial | Partial |
+ fn bitand_assign(&mut self, rhs: Self) {
+ match (*self, rhs) {
+ (Self::Full, Self::Full) => *self = Self::Full,
+ _ => *self = Self::Partial,
}
- _ => Err(CompatibilityError::WrongType {
- writer_schema_type: format!("{writers_schema:#?}"),
- reader_schema_type: format!("{readers_schema:#?}"),
- }),
}
}
-pub struct SchemaCompatibility;
-
struct Checker {
- recursion: HashSet<(u64, u64)>,
+ recursion: HashMap<(u64, u64), Compatibility>,
}
impl Checker {
/// Create a new checker, with recursion set to an empty set.
pub(crate) fn new() -> Self {
Self {
- recursion: HashSet::new(),
+ recursion: HashMap::new(),
}
}
- pub(crate) fn can_read(
+ /// Check if the reader schema can be resolved from the writer schema.
+ pub(crate) fn full_match_schemas(
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- self.full_match_schemas(writers_schema, readers_schema)
+ ) -> Result<Compatibility, CompatibilityError> {
+ // Hash both reader and writer based on their pointer value. This is a
fast way to see if
+ // we get the exact same schemas multiple times (because of recursive
types)
+ let key = (
+ Self::pointer_hash(writers_schema),
+ Self::pointer_hash(readers_schema),
+ );
+
+ // If we already saw this pairing, return the previous value
+ if let Some(c) = self.recursion.get(&key).copied() {
+ Ok(c)
+ } else {
+ let c = self.inner_full_match_schemas(writers_schema,
readers_schema)?;
+ // Insert the new value
+ self.recursion.insert(key, c);
+ Ok(c)
+ }
}
- pub(crate) fn full_match_schemas(
+ /// Hash a schema based only on its pointer value.
+ fn pointer_hash(schema: &Schema) -> u64 {
+ let mut hasher = DefaultHasher::new();
+ ptr::hash(schema, &mut hasher);
+ hasher.finish()
+ }
+
+ /// The actual implementation of [`full_match_schemas`] but without the
recursion protection.
+ ///
+ /// This function should never be called directly as it can recurse
infinitely on recursive types.
+ #[rustfmt::skip]
+ fn inner_full_match_schemas(
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- if self.recursion_in_progress(writers_schema, readers_schema) {
- return Ok(());
- }
-
- SchemaCompatibility::match_schemas(writers_schema, readers_schema)?;
-
- let w_type = SchemaKind::from(writers_schema);
- let r_type = SchemaKind::from(readers_schema);
-
- if w_type != SchemaKind::Union
- && (r_type.is_primitive()
- || r_type == SchemaKind::Fixed
- || r_type == SchemaKind::Uuid
- || r_type == SchemaKind::Date
- || r_type == SchemaKind::TimeMillis
- || r_type == SchemaKind::TimeMicros
- || r_type == SchemaKind::TimestampMillis
- || r_type == SchemaKind::TimestampMicros
- || r_type == SchemaKind::TimestampNanos
- || r_type == SchemaKind::LocalTimestampMillis
- || r_type == SchemaKind::LocalTimestampMicros
- || r_type == SchemaKind::LocalTimestampNanos)
+ ) -> Result<Compatibility, CompatibilityError> {
+ // Compare unqualified names if the schemas have them
+ if let Some(w_name) = writers_schema.name()
+ && let Some(r_name) = readers_schema.name()
+ && w_name.name != r_name.name
{
- return Ok(());
+ return Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.name.clone(),
+ reader_name: r_name.name.clone(),
+ });
}
- match r_type {
- SchemaKind::Ref => match_ref_schemas(writers_schema,
readers_schema),
- SchemaKind::Record => self.match_record_schemas(writers_schema,
readers_schema),
- SchemaKind::Map => {
- if let Schema::Map(w_m) = writers_schema {
- match readers_schema {
- Schema::Map(r_m) =>
self.full_match_schemas(&w_m.types, &r_m.types),
- _ => Err(CompatibilityError::WrongType {
- writer_schema_type: format!("{writers_schema:#?}"),
- reader_schema_type: format!("{readers_schema:#?}"),
- }),
- }
+ // Logical types are downgraded to their actual type
+ match (writers_schema, readers_schema) {
+ (Schema::Ref { name: w_name }, Schema::Ref { name: r_name }) => {
+ if r_name == w_name {
+ Ok(Compatibility::Full)
} else {
- Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: vec![SchemaKind::Record],
+ Err(CompatibilityError::NameMismatch {
+ writer_name: w_name.fullname(None),
+ reader_name: r_name.fullname(None),
})
}
}
- SchemaKind::Array => {
- if let Schema::Array(w_a) = writers_schema {
- match readers_schema {
- Schema::Array(r_a) =>
self.full_match_schemas(&w_a.items, &r_a.items),
- _ => Err(CompatibilityError::WrongType {
- writer_schema_type: format!("{writers_schema:#?}"),
- reader_schema_type: format!("{readers_schema:#?}"),
- }),
- }
+ (Schema::Union(writer), Schema::Union(reader)) => {
+ let mut any = false;
+ let mut all = true;
+ for writer in &writer.schemas {
+ // Try to find a reader variant that is fully compatible
with this writer variant.
+ // In case that does not exist, we keep track of any
partial compatibility we find.
+ let mut local_any = false;
+ all &= reader.schemas.iter().any(|reader| {
+ match self.full_match_schemas(writer, reader) {
+ Ok(Compatibility::Full) => {
+ local_any = true;
+ true
+ }
+ Ok(Compatibility::Partial) => {
+ local_any = true;
+ false
+ }
+ Err(_) => false,
+ }
+ });
+ // Save any match we found
+ any |= local_any;
+ }
+ if all {
+ // All writer variants are fully compatible with reader
variants
+ Ok(Compatibility::Full)
+ } else if any {
+ // At least one writer variant is partially or fully
compatible with a reader variant
+ Ok(Compatibility::Partial)
} else {
- Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: vec![SchemaKind::Array],
- })
+ Err(CompatibilityError::MissingUnionElements)
}
}
- SchemaKind::Union => self.match_union_schemas(writers_schema,
readers_schema),
- SchemaKind::Enum => {
- // reader's symbols must contain all writer's symbols
- if let Schema::Enum(EnumSchema {
- symbols: w_symbols, ..
- }) = writers_schema
- {
- if let Schema::Enum(EnumSchema {
- symbols: r_symbols, ..
- }) = readers_schema
- {
- if w_symbols.iter().all(|e| r_symbols.contains(e)) {
- return Ok(());
+ (Schema::Union(writer), _) => {
+ // Check if all writer variants are fully compatible with the
reader schema.
+ // We keep track of if we see any (partial) compatibility.
+ let mut any = false;
+ let mut all = true;
+ for writer in &writer.schemas {
+ match self.full_match_schemas(writer, readers_schema) {
+ Ok(Compatibility::Full) => any = true,
+ Ok(Compatibility::Partial) => {
+ any = true;
+ all = false;
+ }
+ Err(_) => {
+ all = false;
}
}
}
- Err(CompatibilityError::MissingSymbols)
+ if all {
+ // All writer variants are fully compatible with the
reader schema
+ Ok(Compatibility::Full)
+ } else if any {
+ // At least one writer variant is partially compatible
with the reader schema
+ Ok(Compatibility::Partial)
+ } else {
+ Err(CompatibilityError::SchemaMismatchAllUnionElements)
+ }
}
- _ => {
- if w_type == SchemaKind::Union {
- if let Schema::Union(r) = writers_schema {
- if r.schemas.len() == 1 {
- return self.full_match_schemas(&r.schemas[0],
readers_schema);
+ (_, Schema::Union(reader)) => {
+ // Try to find a fully compatible reader variant for the
writer schema.
+ // In case that does not exist, we keep track of any partial
compatibility.
+ let mut partial = false;
+ if reader.schemas.iter().any(|reader| {
+ match self.full_match_schemas(writers_schema, reader) {
+ Ok(Compatibility::Full) => true,
+ Ok(Compatibility::Partial) => {
+ partial = true;
+ false
}
+ Err(_) => false,
}
+ }) {
+ // At least one reader variant is fully compatible with
the writer schema
+ Ok(Compatibility::Full)
+ } else if partial {
+ // At least one reader variant is partially compatible
with the writer schema
+ Ok(Compatibility::Partial)
+ } else {
+ Err(CompatibilityError::SchemaMismatchAllUnionElements)
}
- Err(CompatibilityError::Inconclusive(String::from(
- "writers_schema",
- )))
}
- }
- }
-
- fn match_record_schemas(
- &mut self,
- writers_schema: &Schema,
- readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- let w_type = SchemaKind::from(writers_schema);
-
- if w_type == SchemaKind::Union {
- return Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: vec![SchemaKind::Record],
- });
- }
-
- if let Schema::Record(RecordSchema {
- fields: w_fields,
- lookup: w_lookup,
- ..
- }) = writers_schema
- {
- if let Schema::Record(RecordSchema {
- fields: r_fields, ..
- }) = readers_schema
- {
- for field in r_fields.iter() {
- // get all field names in a vector (field.name + aliases)
- let mut fields_names = vec![&field.name];
- if let Some(ref aliases) = field.aliases {
- for alias in aliases {
- fields_names.push(alias);
- }
- }
-
- // Check whether any of the possible fields names are in
the writer schema.
- // If the field was found, then it must have the exact
same name with the writer field,
- // otherwise we would have a false positive with the
writers aliases
- let position = fields_names.iter().find_map(|field_name| {
- if let Some(pos) = w_lookup.get(*field_name) {
- if &w_fields[*pos].name == *field_name {
- return Some(pos);
- }
- }
- None
+ (Schema::Null, Schema::Null) => Ok(Compatibility::Full),
+ (Schema::Boolean, Schema::Boolean) => Ok(Compatibility::Full),
+ // int promotes to long, float and double
+ (
+ Schema::Int | Schema::Date | Schema::TimeMillis,
+ Schema::Int | Schema::Long | Schema::Float | Schema::Double |
Schema::Date
+ | Schema::TimeMillis | Schema::TimeMicros |
Schema::TimestampMillis
+ | Schema::TimestampMicros | Schema::TimestampNanos |
Schema::LocalTimestampMillis
+ | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
+ ) => Ok(Compatibility::Full),
+ // long promotes to float and double
+ (
+ Schema::Long | Schema::TimeMicros | Schema::TimestampMillis
+ | Schema::TimestampMicros | Schema::TimestampNanos |
Schema::LocalTimestampMillis
+ | Schema::LocalTimestampMicros | Schema::LocalTimestampNanos,
+ Schema::Long | Schema::Float | Schema::Double |
Schema::TimeMicros
+ | Schema::TimestampMillis | Schema::TimestampMicros |
Schema::TimestampNanos
+ | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros
+ | Schema::LocalTimestampNanos,
+ ) => Ok(Compatibility::Full),
+ // float promotes to double
+ (Schema::Float, Schema::Float | Schema::Double) =>
Ok(Compatibility::Full),
+ (Schema::Double, Schema::Double) => Ok(Compatibility::Full),
+ // This check needs to be above the other Decimal checks, so that
if both schemas are
+ // Decimal then the precision and scale are checked. The other
Decimal checks below will
+ // thus only hit if only one of the schemas is a Decimal
+ (
+ Schema::Decimal(DecimalSchema { precision: w_precision, scale:
w_scale, .. }),
+ Schema::Decimal(DecimalSchema { precision: r_precision, scale:
r_scale, .. }),
+ ) => {
+ // precision and scale must match
+ if r_precision == w_precision && r_scale == w_scale {
+ Ok(Compatibility::Full)
+ } else {
+ Err(CompatibilityError::DecimalMismatch {
+ r_precision: *r_precision,
+ r_scale: *r_scale,
+ w_precision: *w_precision,
+ w_scale: *w_scale
+ })
+ }
+ }
+ // bytes and strings are interchangeable
+ (
+ Schema::Bytes | Schema::String | Schema::BigDecimal
+ | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
+ | Schema::Decimal(DecimalSchema { inner:
InnerDecimalSchema::Bytes, .. }),
+ Schema::Bytes | Schema::String | Schema::BigDecimal
+ | Schema::Uuid(UuidSchema::String | UuidSchema::Bytes)
+ | Schema::Decimal(DecimalSchema { inner:
InnerDecimalSchema::Bytes, .. }),
+ ) => Ok(Compatibility::Full),
+ (Schema::Uuid(_), Schema::Uuid(_)) => Ok(Compatibility::Full),
+ (
+ Schema::Fixed(w_fixed) |
Schema::Uuid(UuidSchema::Fixed(w_fixed))
+ | Schema::Decimal(DecimalSchema { inner:
InnerDecimalSchema::Fixed(w_fixed), .. })
+ | Schema::Duration(w_fixed),
+ Schema::Fixed(r_fixed) |
Schema::Uuid(UuidSchema::Fixed(r_fixed))
+ | Schema::Decimal(DecimalSchema { inner:
InnerDecimalSchema::Fixed(r_fixed), .. })
+ | Schema::Duration(r_fixed),
+ ) => {
+ // Size must match
+ if r_fixed.size == w_fixed.size {
+ Ok(Compatibility::Full)
+ } else {
+ Err(CompatibilityError::FixedMismatch)
+ }
+ }
+ (
+ Schema::Array(ArraySchema { items: w_items, .. }),
+ Schema::Array(ArraySchema { items: r_items, .. }),
+ ) => {
+ // array schemas must match
+ self.full_match_schemas(w_items, r_items)
+ }
+ (
+ Schema::Map(MapSchema { types: w_types, .. }),
+ Schema::Map(MapSchema { types: r_types, .. }),
+ ) => {
+ // type schemas must match
+ self.full_match_schemas(w_types, r_types)
+ }
+ (
+ Schema::Enum(EnumSchema { symbols: w_symbols, .. }),
+ Schema::Enum(EnumSchema { symbols: r_symbols, default:
r_default, .. }),
+ ) => {
+ // Reader must have a default or all symbols in the writer
must also be in the reader
+ if r_default.is_some() {
+ // No need to iter over all the fields if there is a
default
+ Ok(Compatibility::Full)
+ } else {
+ let mut any = false;
+ let mut all = true;
+ w_symbols.iter().for_each(|s| {
+ let res = r_symbols.contains(s);
+ any |= res;
+ all &= res;
});
-
- match position {
- Some(pos) => {
- if let Err(err) =
-
self.full_match_schemas(&w_fields[*pos].schema, &field.schema)
- {
+ if all {
+ // All symbols match
+ Ok(Compatibility::Full)
+ } else if any {
+ // Only some symbols match
+ Ok(Compatibility::Partial)
+ } else {
+ // No symbols match
+ Err(CompatibilityError::MissingSymbols)
+ }
+ }
+ }
+ (
+ Schema::Record(RecordSchema { fields: w_fields, .. }),
+ Schema::Record(RecordSchema { fields: r_fields, .. }),
+ ) => {
+ let mut compatibility = Compatibility::Full;
+ for r_field in r_fields {
+ // Can't use RecordField.lookup as aliases are also
inserted into there and we
+ // are not allowed to match on writer aliases.
+ // Search using field name and *after* that aliases.
+ if let Some(w_field) = once(&r_field.name)
+
.chain(r_field.aliases.as_deref().unwrap_or(&[]).iter())
+ .find_map(|ra| w_fields.iter().find(|wf| &wf.name ==
ra))
+ {
+ // Check that the schemas are compatible
+ match self.full_match_schemas(&w_field.schema,
&r_field.schema) {
+ Ok(c) => compatibility &= c,
+ Err(err) => {
return
Err(CompatibilityError::FieldTypeMismatch(
- field.name.clone(),
+ r_field.name.clone(),
Box::new(err),
));
}
}
- _ => {
- if field.default.is_none() {
- return
Err(CompatibilityError::MissingDefaultValue(
- field.name.clone(),
- ));
- }
- }
+ } else if r_field.default.is_none() {
+ // No default and no matching field in the writer
+ return Err(CompatibilityError::MissingDefaultValue(
+ r_field.name.clone(),
+ ));
}
}
+ Ok(compatibility)
}
+ (_, _) => Err(CompatibilityError::WrongType {
+ writer_schema_type: format!("{writers_schema:#?}"),
+ reader_schema_type: format!("{readers_schema:#?}"),
+ }),
}
- Ok(())
}
- fn match_union_schemas(
+ pub(crate) fn can_read(
&mut self,
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- if let Schema::Union(u) = writers_schema {
- if u.schemas
- .iter()
- .all(|schema| self.full_match_schemas(schema,
readers_schema).is_ok())
- {
- return Ok(());
- } else {
- return Err(CompatibilityError::MissingUnionElements);
- }
- } else if let Schema::Union(u) = readers_schema {
- // This check is needed because the writer_schema can be not union
- // but the type can be contain in the union of the reader schema
- // e.g. writer_schema is string and reader_schema is [string, int]
- if u.schemas
- .iter()
- .any(|schema| self.full_match_schemas(writers_schema,
schema).is_ok())
- {
- return Ok(());
- }
- }
- Err(CompatibilityError::MissingUnionElements)
- }
-
- fn recursion_in_progress(&mut self, writers_schema: &Schema,
readers_schema: &Schema) -> bool {
- let mut hasher = DefaultHasher::new();
- ptr::hash(writers_schema, &mut hasher);
- let w_hash = hasher.finish();
-
- hasher = DefaultHasher::new();
- ptr::hash(readers_schema, &mut hasher);
- let r_hash = hasher.finish();
-
- let key = (w_hash, r_hash);
- // This is a shortcut to add if not exists *and* return false. It will
return true
- // if it was able to insert.
- !self.recursion.insert(key)
+ ) -> Result<Compatibility, CompatibilityError> {
+ self.full_match_schemas(writers_schema, readers_schema)
}
}
@@ -290,7 +381,7 @@ impl SchemaCompatibility {
pub fn can_read(
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
+ ) -> Result<Compatibility, CompatibilityError> {
let mut c = Checker::new();
c.can_read(writers_schema, readers_schema)
}
@@ -300,308 +391,21 @@ impl SchemaCompatibility {
pub fn mutual_read(
writers_schema: &Schema,
readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- SchemaCompatibility::can_read(writers_schema, readers_schema)?;
- SchemaCompatibility::can_read(readers_schema, writers_schema)
- }
-
- /// `match_schemas` performs a basic check that a datum written with the
- /// writers_schema could be read using the readers_schema. This check
only includes
- /// matching the types, including schema promotion, and matching the full
name for
- /// named types. Aliases for named types are not supported here, and the
rust
- /// implementation of Avro in general does not include support for
aliases (I think).
- pub(crate) fn match_schemas(
- writers_schema: &Schema,
- readers_schema: &Schema,
- ) -> Result<(), CompatibilityError> {
- fn check_reader_type_multi(
- reader_type: SchemaKind,
- allowed_reader_types: Vec<SchemaKind>,
- writer_type: SchemaKind,
- ) -> Result<(), CompatibilityError> {
- if allowed_reader_types.contains(&reader_type) {
- Ok(())
- } else {
- let mut allowed_types: Vec<SchemaKind> = vec![writer_type];
-
allowed_types.extend_from_slice(allowed_reader_types.as_slice());
- Err(CompatibilityError::TypeExpected {
- schema_type: String::from("readers_schema"),
- expected_type: allowed_types,
- })
- }
- }
-
- fn check_reader_type(
- reader_type: SchemaKind,
- allowed_reader_type: SchemaKind,
- writer_type: SchemaKind,
- ) -> Result<(), CompatibilityError> {
- if reader_type == allowed_reader_type {
- Ok(())
- } else {
- Err(CompatibilityError::TypeExpected {
- schema_type: String::from("readers_schema"),
- expected_type: vec![writer_type, allowed_reader_type],
- })
- }
- }
-
- fn check_writer_type(
- writers_schema: &Schema,
- allowed_schema: &Schema,
- expected_schema_types: Vec<SchemaKind>,
- ) -> Result<(), CompatibilityError> {
- if *allowed_schema == *writers_schema {
- Ok(())
- } else {
- Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: expected_schema_types,
- })
- }
- }
-
- let w_type = SchemaKind::from(writers_schema);
- let r_type = SchemaKind::from(readers_schema);
-
- if w_type == SchemaKind::Union || r_type == SchemaKind::Union {
- return Ok(());
- }
-
- if w_type == r_type {
- if r_type.is_primitive() {
- return Ok(());
- }
-
- match r_type {
- SchemaKind::Record | SchemaKind::Enum => {
- let msg = format!("A {readers_schema} type must always has
a name");
- let writers_name = writers_schema.name().expect(&msg);
- let readers_name = readers_schema.name().expect(&msg);
-
- if writers_name.name == readers_name.name {
- return Ok(());
- }
-
- return Err(CompatibilityError::NameMismatch {
- writer_name: writers_name.name.clone(),
- reader_name: readers_name.name.clone(),
- });
- }
- SchemaKind::Fixed => {
- if let Schema::Fixed(FixedSchema {
- name: w_name,
- aliases: _,
- doc: _w_doc,
- size: w_size,
- default: _w_default,
- attributes: _,
- }) = writers_schema
- {
- if let Schema::Fixed(FixedSchema {
- name: r_name,
- aliases: _,
- doc: _r_doc,
- size: r_size,
- default: _r_default,
- attributes: _,
- }) = readers_schema
- {
- return (w_name.name == r_name.name && w_size ==
r_size)
- .then_some(())
- .ok_or(CompatibilityError::FixedMismatch);
- }
- }
- }
- SchemaKind::Map => {
- if let Schema::Map(w_m) = writers_schema {
- if let Schema::Map(r_m) = readers_schema {
- return
SchemaCompatibility::match_schemas(&w_m.types, &r_m.types);
- }
- }
- }
- SchemaKind::Array => {
- if let Schema::Array(w_a) = writers_schema {
- if let Schema::Array(r_a) = readers_schema {
- return
SchemaCompatibility::match_schemas(&w_a.items, &r_a.items);
- }
- }
- }
- SchemaKind::Uuid => match readers_schema {
- Schema::Uuid(UuidSchema::Bytes) => {
- return check_writer_type(
- writers_schema,
- readers_schema,
- vec![r_type, SchemaKind::Bytes],
- );
- }
- Schema::Uuid(UuidSchema::String) => {
- return check_writer_type(
- writers_schema,
- readers_schema,
- vec![r_type, SchemaKind::String],
- );
- }
- Schema::Uuid(UuidSchema::Fixed(FixedSchema {
- name: r_name,
- size: r_size,
- ..
- })) => match writers_schema {
- Schema::Uuid(UuidSchema::Fixed(FixedSchema {
- name: w_name,
- size: w_size,
- ..
- }))
- | Schema::Fixed(FixedSchema {
- name: w_name,
- size: w_size,
- ..
- }) => {
- return (w_name.name == r_name.name && w_size ==
r_size)
- .then_some(())
- .ok_or(CompatibilityError::FixedMismatch);
- }
- _ => {
- return Err(CompatibilityError::TypeExpected {
- schema_type: String::from("writers_schema"),
- expected_type: vec![SchemaKind::Uuid,
SchemaKind::Fixed],
- });
- }
- },
- Schema::Null
- | Schema::Boolean
- | Schema::Int
- | Schema::Long
- | Schema::Float
- | Schema::Double
- | Schema::Bytes
- | Schema::String
- | Schema::Array(_)
- | Schema::Map(_)
- | Schema::Union(_)
- | Schema::Record(_)
- | Schema::Enum(_)
- | Schema::Fixed(_)
- | Schema::Decimal(_)
- | Schema::BigDecimal
- | Schema::Date
- | Schema::TimeMillis
- | Schema::TimeMicros
- | Schema::TimestampMillis
- | Schema::TimestampMicros
- | Schema::TimestampNanos
- | Schema::LocalTimestampMillis
- | Schema::LocalTimestampMicros
- | Schema::LocalTimestampNanos
- | Schema::Duration(_)
- | Schema::Ref { .. } => {
- unreachable!("SchemaKind::Uuid can only be a
Schema::Uuid")
- }
- },
- SchemaKind::Date | SchemaKind::TimeMillis => {
- return check_writer_type(
- writers_schema,
- readers_schema,
- vec![r_type, SchemaKind::Int],
- );
- }
- SchemaKind::TimeMicros
- | SchemaKind::TimestampNanos
- | SchemaKind::TimestampMillis
- | SchemaKind::TimestampMicros
- | SchemaKind::LocalTimestampMillis
- | SchemaKind::LocalTimestampMicros
- | SchemaKind::LocalTimestampNanos => {
- return check_writer_type(
- writers_schema,
- readers_schema,
- vec![r_type, SchemaKind::Long],
- );
- }
- SchemaKind::Duration => {
- return Ok(());
- }
- SchemaKind::Ref => return match_ref_schemas(writers_schema,
readers_schema),
- _ => {
- return Err(CompatibilityError::Inconclusive(String::from(
- "readers_schema",
- )));
- }
- };
- }
-
- // Here are the checks for primitive types
- match w_type {
- SchemaKind::Int => check_reader_type_multi(
- r_type,
- vec![
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::Date,
- SchemaKind::TimeMillis,
- ],
- w_type,
- ),
- SchemaKind::Long => check_reader_type_multi(
- r_type,
- vec![
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::TimeMicros,
- SchemaKind::TimestampMillis,
- SchemaKind::TimestampMicros,
- SchemaKind::TimestampNanos,
- SchemaKind::LocalTimestampMillis,
- SchemaKind::LocalTimestampMicros,
- SchemaKind::LocalTimestampNanos,
- ],
- w_type,
- ),
- SchemaKind::Float => {
- check_reader_type_multi(r_type, vec![SchemaKind::Float,
SchemaKind::Double], w_type)
- }
- SchemaKind::String => {
- check_reader_type_multi(r_type, vec![SchemaKind::Bytes,
SchemaKind::Uuid], w_type)
- }
- SchemaKind::Bytes => {
- check_reader_type_multi(r_type, vec![SchemaKind::String,
SchemaKind::Uuid], w_type)
- }
- SchemaKind::Uuid => check_reader_type_multi(
- r_type,
- vec![SchemaKind::String, SchemaKind::Bytes, SchemaKind::Fixed],
- w_type,
- ),
- SchemaKind::Fixed => check_reader_type_multi(
- r_type,
- vec![SchemaKind::Duration, SchemaKind::Uuid],
- w_type,
- ),
- SchemaKind::Date | SchemaKind::TimeMillis => {
- check_reader_type(r_type, SchemaKind::Int, w_type)
- }
- SchemaKind::TimeMicros
- | SchemaKind::TimestampMicros
- | SchemaKind::TimestampMillis
- | SchemaKind::TimestampNanos
- | SchemaKind::LocalTimestampMillis
- | SchemaKind::LocalTimestampMicros
- | SchemaKind::LocalTimestampNanos => {
- check_reader_type(r_type, SchemaKind::Long, w_type)
- }
- _ => Err(CompatibilityError::Inconclusive(String::from(
- "writers_schema",
- ))),
- }
+ ) -> Result<Compatibility, CompatibilityError> {
+ let mut c = SchemaCompatibility::can_read(writers_schema,
readers_schema)?;
+ c &= SchemaCompatibility::can_read(readers_schema, writers_schema)?;
+ Ok(c)
}
}
#[cfg(test)]
mod tests {
+ use std::collections::BTreeMap;
+
use super::*;
- use crate::schema::{Name, UuidSchema};
use crate::{
- Codec, Reader, Writer,
+ Codec, Decimal, Reader, Writer,
+ schema::{FixedSchema, Name, UuidSchema},
types::{Record, Value},
};
use apache_avro_test_helper::TestResult;
@@ -749,9 +553,9 @@ mod tests {
#[test]
fn test_broken() {
assert_eq!(
- CompatibilityError::MissingUnionElements,
- SchemaCompatibility::can_read(&int_string_union_schema(),
&int_union_schema())
- .unwrap_err()
+ Compatibility::Partial,
+ SchemaCompatibility::can_read(&int_string_union_schema(),
&int_union_schema()).unwrap(),
+ "Only compatible if writer writes an int"
)
}
@@ -847,12 +651,12 @@ mod tests {
r#"{"type": "int"}"#,
r#"{"type": "int", "logicalType": "time-millis"}"#
)]
- // time-millis type
+ // time-micros type
#[case(
r#"{"type": "long"}"#,
r#"{"type": "long", "logicalType": "time-micros"}"#
)]
- // timetimestamp-nanos type
+ // timestamp-nanos type
#[case(
r#"{"type": "long"}"#,
r#"{"type": "long", "logicalType": "timestamp-nanos"}"#
@@ -894,7 +698,7 @@ mod tests {
let writer_schema = Schema::parse_str(writer_schema_str).unwrap();
let reader_schema = Schema::parse_str(reader_schema_str).unwrap();
- assert!(SchemaCompatibility::match_schemas(&writer_schema,
&reader_schema).is_ok());
+ assert!(SchemaCompatibility::can_read(&writer_schema,
&reader_schema).is_ok());
}
#[rstest]
@@ -920,152 +724,73 @@ mod tests {
#[case(
r#"{"type":"map", "values": "long"}"#,
r#"{"type":"map", "values": "int"}"#,
- CompatibilityError::TypeExpected {schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::TimeMicros,
- SchemaKind::TimestampMillis,
- SchemaKind::TimestampMicros,
- SchemaKind::TimestampNanos,
- SchemaKind::LocalTimestampMillis,
- SchemaKind::LocalTimestampMicros,
- SchemaKind::LocalTimestampNanos,
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"Long".to_string(), reader_schema_type: "Int".to_string() }
)]
// Array type test
#[case(
r#"{"type": "array", "items": "long"}"#,
r#"{"type": "array", "items": "int"}"#,
- CompatibilityError::TypeExpected {schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::TimeMicros,
- SchemaKind::TimestampMillis,
- SchemaKind::TimestampMicros,
- SchemaKind::TimestampNanos,
- SchemaKind::LocalTimestampMillis,
- SchemaKind::LocalTimestampMicros,
- SchemaKind::LocalTimestampNanos,
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"Long".to_string(), reader_schema_type: "Int".to_string() }
)]
// Date type test
#[case(
r#"{"type": "string"}"#,
r#"{"type": "int", "logicalType": "date"}"#,
- CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::String,
- SchemaKind::Bytes,
- SchemaKind::Uuid,
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "Date".to_string() }
)]
// time-millis type
#[case(
r#"{"type": "string"}"#,
r#"{"type": "int", "logicalType": "time-millis"}"#,
- CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::String,
- SchemaKind::Bytes,
- SchemaKind::Uuid,
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "TimeMillis".to_string() }
)]
// time-millis type
#[case(
- r#"{"type": "int"}"#,
+ r#"{"type": "string"}"#,
r#"{"type": "long", "logicalType": "time-micros"}"#,
- CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::Int,
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::Date,
- SchemaKind::TimeMillis
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "TimeMicros".to_string() }
+ )]
+ // timestamp-nanos type
+ #[case(
+ r#"{"type": "string"}"#,
+ r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "TimestampNanos".to_string() }
)]
- // timestamp-nanos type. This test should fail because it is not supported
on schema parse_complex
- // #[case(
- // r#"{"type": "string"}"#,
- // r#"{"type": "long", "logicalType": "timestamp-nanos"}"#,
- // CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- // SchemaKind::Int,
- // SchemaKind::Long,
- // SchemaKind::Float,
- // SchemaKind::Double,
- // SchemaKind::Date,
- // SchemaKind::TimeMillis
- // ]}
- // )]
// timestamp-millis type
#[case(
- r#"{"type": "int"}"#,
+ r#"{"type": "string"}"#,
r#"{"type": "long", "logicalType": "timestamp-millis"}"#,
- CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::Int,
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::Date,
- SchemaKind::TimeMillis
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "TimestampMillis".to_string() }
)]
// timestamp-micros type
#[case(
- r#"{"type": "int"}"#,
+ r#"{"type": "string"}"#,
r#"{"type": "long", "logicalType": "timestamp-micros"}"#,
- CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::Int,
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::Date,
- SchemaKind::TimeMillis
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "TimestampMicros".to_string() }
)]
// local-timestamp-millis type
#[case(
- r#"{"type": "int"}"#,
+ r#"{"type": "string"}"#,
r#"{"type": "long", "logicalType": "local-timestamp-millis"}"#,
- CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::Int,
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::Date,
- SchemaKind::TimeMillis
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "LocalTimestampMillis".to_string() }
)]
// local-timestamp-micros type
#[case(
- r#"{"type": "int"}"#,
+ r#"{"type": "string"}"#,
r#"{"type": "long", "logicalType": "local-timestamp-micros"}"#,
- CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- SchemaKind::Int,
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::Date,
- SchemaKind::TimeMillis
- ]}
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "LocalTimestampMicros".to_string() }
)]
- // local-timestamp-nanos type. This test should fail because it is not
supported on schema parse_complex
- // #[case(
- // r#"{"type": "int"}"#,
- // r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
- // CompatibilityError::TypeExpected{schema_type:
String::from("readers_schema"), expected_type: vec![
- // SchemaKind::Int,
- // SchemaKind::Long,
- // SchemaKind::Float,
- // SchemaKind::Double,
- // SchemaKind::Date,
- // SchemaKind::TimeMillis
- // ]}
- // )]
- // When comparing different types we always get Inconclusive
+ // local-timestamp-nanos type
+ #[case(
+ r#"{"type": "string"}"#,
+ r#"{"type": "long", "logicalType": "local-timestamp-nanos"}"#,
+ CompatibilityError::WrongType { writer_schema_type:
"String".to_string(), reader_schema_type: "LocalTimestampNanos".to_string() }
+ )]
+ // Names are checked first, so this should not be a WrongType
#[case(
r#"{"type": "record", "name":"record_b", "fields": [{"type": "long",
"name": "date"}]}"#,
r#"{"type": "fixed", "name": "EmployeeId", "size": 16}"#,
- CompatibilityError::Inconclusive(String::from("writers_schema"))
+ CompatibilityError::NameMismatch { writer_name:
"record_b".to_string(), reader_name: "EmployeeId".to_string() }
)]
fn test_avro_3950_match_schemas_error(
#[case] writer_schema_str: &str,
@@ -1077,17 +802,14 @@ mod tests {
assert_eq!(
expected_error,
- SchemaCompatibility::match_schemas(&writer_schema,
&reader_schema).unwrap_err()
+ SchemaCompatibility::can_read(&writer_schema,
&reader_schema).unwrap_err()
)
}
#[test]
fn test_compatible_reader_writer_pairs() {
let uuid_fixed = FixedSchema {
- name: Name {
- name: String::new(),
- namespace: None,
- },
+ name: Name::new("uuid_fixed").unwrap(),
aliases: None,
doc: None,
size: 16,
@@ -1295,11 +1017,14 @@ mod tests {
let valid_reader = string_array_schema();
let invalid_reader = string_map_schema();
- assert!(SchemaCompatibility::can_read(&string_array_schema(),
&valid_reader).is_ok());
assert_eq!(
- CompatibilityError::Inconclusive(String::from("writers_schema")),
- SchemaCompatibility::can_read(&string_array_schema(),
&invalid_reader).unwrap_err()
+ Compatibility::Full,
+ SchemaCompatibility::can_read(&string_array_schema(),
&valid_reader).unwrap()
);
+ assert!(matches!(
+ SchemaCompatibility::can_read(&string_array_schema(),
&invalid_reader),
+ Err(CompatibilityError::WrongType { .. }),
+ ));
}
#[test]
@@ -1307,16 +1032,9 @@ mod tests {
let valid_reader = Schema::String;
assert!(SchemaCompatibility::can_read(&Schema::String,
&valid_reader).is_ok());
assert_eq!(
- CompatibilityError::TypeExpected {
- schema_type: String::from("readers_schema"),
- expected_type: vec![
- SchemaKind::Int,
- SchemaKind::Long,
- SchemaKind::Float,
- SchemaKind::Double,
- SchemaKind::Date,
- SchemaKind::TimeMillis
- ],
+ CompatibilityError::WrongType {
+ writer_schema_type: "Int".to_string(),
+ reader_schema_type: "String".to_string()
},
SchemaCompatibility::can_read(&Schema::Int,
&Schema::String).unwrap_err()
);
@@ -1329,10 +1047,13 @@ mod tests {
let union_reader = union_schema(vec![Schema::String]);
assert_eq!(
- CompatibilityError::MissingUnionElements,
- SchemaCompatibility::can_read(&union_writer,
&union_reader).unwrap_err()
+ Compatibility::Partial,
+ SchemaCompatibility::can_read(&union_writer,
&union_reader).unwrap()
+ );
+ assert_eq!(
+ Compatibility::Full,
+ SchemaCompatibility::can_read(&union_reader,
&union_writer).unwrap()
);
- assert!(SchemaCompatibility::can_read(&union_reader,
&union_writer).is_ok());
}
#[test]
@@ -1356,9 +1077,9 @@ mod tests {
assert_eq!(
CompatibilityError::FieldTypeMismatch(
"field1".to_owned(),
- Box::new(CompatibilityError::TypeExpected {
- schema_type: "readers_schema".to_owned(),
- expected_type: vec![SchemaKind::String, SchemaKind::Bytes,
SchemaKind::Uuid]
+ Box::new(CompatibilityError::WrongType {
+ writer_schema_type: "String".to_string(),
+ reader_schema_type: "Int".to_string()
})
),
SchemaCompatibility::can_read(&string_schema,
&int_schema).unwrap_err()
@@ -1377,10 +1098,13 @@ mod tests {
let enum_schema2 =
Schema::parse_str(r#"{"type":"enum", "name":"MyEnum",
"symbols":["A","B","C"]}"#)?;
assert_eq!(
- CompatibilityError::MissingSymbols,
- SchemaCompatibility::can_read(&enum_schema2,
&enum_schema1).unwrap_err()
+ Compatibility::Partial,
+ SchemaCompatibility::can_read(&enum_schema2, &enum_schema1)?
+ );
+ assert_eq!(
+ Compatibility::Full,
+ SchemaCompatibility::can_read(&enum_schema1, &enum_schema2)?
);
- assert!(SchemaCompatibility::can_read(&enum_schema1,
&enum_schema2).is_ok());
Ok(())
}
@@ -1453,7 +1177,7 @@ mod tests {
// short name match, but no structure match
let read_schema = union_schema(vec![Schema::Null,
point_3d_no_default_schema()]);
assert_eq!(
- CompatibilityError::MissingUnionElements,
+ CompatibilityError::SchemaMismatchAllUnionElements,
SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
);
}
@@ -1468,7 +1192,7 @@ mod tests {
point_3d_schema(),
]);
assert_eq!(
- CompatibilityError::MissingUnionElements,
+ CompatibilityError::SchemaMismatchAllUnionElements,
SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
);
}
@@ -1483,7 +1207,7 @@ mod tests {
point_2d_schema(),
]);
assert_eq!(
- CompatibilityError::MissingUnionElements,
+ CompatibilityError::SchemaMismatchAllUnionElements,
SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
);
}
@@ -1498,7 +1222,7 @@ mod tests {
point_3d_schema(),
]);
assert_eq!(
- CompatibilityError::MissingUnionElements,
+ CompatibilityError::SchemaMismatchAllUnionElements,
SchemaCompatibility::can_read(&point_2d_fullname_schema(),
&read_schema).unwrap_err()
);
}
@@ -1706,7 +1430,10 @@ mod tests {
}"#,
)?;
- assert!(SchemaCompatibility::can_read(&schema_v2, &schema_v1).is_ok());
+ assert_eq!(
+ Compatibility::Full,
+ SchemaCompatibility::can_read(&schema_v2, &schema_v1)?
+ );
assert_eq!(
CompatibilityError::MissingDefaultValue(String::from("time")),
SchemaCompatibility::can_read(&schema_v1, &schema_v2).unwrap_err()
@@ -1815,7 +1542,6 @@ mod tests {
]
}"#,
)?,
- "Incompatible schemata! Field 'success' in reader schema does
not match the type in the writer schema",
),
(
Schema::parse_str(
@@ -1836,17 +1562,17 @@ mod tests {
]
}"#,
)?,
- "Incompatible schemata! Field 'max_values' in reader schema
does not match the type in the writer schema",
),
];
- for (schema_1, schema_2, error) in schemas {
- assert!(SchemaCompatibility::can_read(&schema_1,
&schema_2).is_ok());
+ for (schema_1, schema_2) in schemas {
+ assert_eq!(
+ Compatibility::Full,
+ SchemaCompatibility::can_read(&schema_1, &schema_2).unwrap()
+ );
assert_eq!(
- error,
- SchemaCompatibility::can_read(&schema_2, &schema_1)
- .unwrap_err()
- .to_string()
+ Compatibility::Partial,
+ SchemaCompatibility::can_read(&schema_2, &schema_1).unwrap()
);
}
@@ -1887,4 +1613,65 @@ mod tests {
Ok(())
}
+
+ #[test]
+ fn duration_and_fixed_of_different_size() -> TestResult {
+ let schema_strs = vec![
+ r#"{
+ "type": "fixed",
+ "name": "Fixed25",
+ "size": 25
+ }
+ "#,
+ r#"{
+ "type": "fixed",
+ "logicalType": "duration",
+ "name": "Duration",
+ "size": 12
+ }
+ "#,
+ ];
+
+ let schemas = Schema::parse_list(schema_strs).unwrap();
+ assert!(SchemaCompatibility::can_read(&schemas[0],
&schemas[1]).is_err());
+ assert!(SchemaCompatibility::can_read(&schemas[1],
&schemas[0]).is_err());
+ SchemaCompatibility::can_read(&schemas[1], &schemas[1])?;
+ SchemaCompatibility::can_read(&schemas[0], &schemas[0])?;
+
+ Ok(())
+ }
+
+ #[test]
+ fn avro_rs_342_decimal_fixed_and_bytes() -> TestResult {
+ let bytes = Schema::Decimal(DecimalSchema {
+ precision: 20,
+ scale: 0,
+ inner: InnerDecimalSchema::Bytes,
+ });
+ let fixed = Schema::Decimal(DecimalSchema {
+ precision: 20,
+ scale: 0,
+ inner: InnerDecimalSchema::Fixed(FixedSchema {
+ name: Name::new("DecimalFixed")?,
+ aliases: None,
+ doc: None,
+ size: 20,
+ default: None,
+ attributes: BTreeMap::default(),
+ }),
+ });
+
+ assert_eq!(
+ Compatibility::Full,
+ SchemaCompatibility::mutual_read(&bytes, &fixed)?
+ );
+
+ let value = Value::Decimal(Decimal::from(vec![1; 10]));
+ let fixed_value = value.clone().resolve(&fixed)?;
+ let bytes_value = value.resolve(&bytes)?;
+
+ assert_eq!(fixed_value, bytes_value);
+
+ Ok(())
+ }
}
diff --git a/avro/src/types.rs b/avro/src/types.rs
index 85a26ca..4c659c5 100644
--- a/avro/src/types.rs
+++ b/avro/src/types.rs
@@ -448,9 +448,28 @@ impl Value {
(&Value::Double(_), &Schema::Double) => None,
(&Value::Bytes(_), &Schema::Bytes) => None,
(&Value::Bytes(_), &Schema::Decimal { .. }) => None,
- (&Value::Bytes(_), &Schema::Uuid(UuidSchema::Bytes)) => None,
+ (Value::Bytes(bytes), &Schema::Uuid(UuidSchema::Bytes)) => {
+ if bytes.len() != 16 {
+ Some(format!(
+ "The value's size ({}) is not the right length for a
bytes UUID (16)",
+ bytes.len()
+ ))
+ } else {
+ None
+ }
+ }
(&Value::String(_), &Schema::String) => None,
- (&Value::String(_), &Schema::Uuid(UuidSchema::String)) => None,
+ (Value::String(string), &Schema::Uuid(UuidSchema::String)) => {
+ // Non-hyphenated is 32 characters, hyphenated is longer
+ if string.len() < 32 {
+ Some(format!(
+ "The value's size ({}) is not the right length for a
string UUID (>=32)",
+ string.len()
+ ))
+ } else {
+ None
+ }
+ }
(&Value::Fixed(n, _), &Schema::Fixed(FixedSchema { size, .. })) =>
{
if n != size {
Some(format!(
@@ -1182,10 +1201,11 @@ impl Value {
fn try_u8(self) -> AvroResult<u8> {
let int = self.resolve(&Schema::Int)?;
- if let Value::Int(n) = int {
- if n >= 0 && n <= i32::from(u8::MAX) {
- return Ok(n as u8);
- }
+ if let Value::Int(n) = int
+ && n >= 0
+ && n <= i32::from(u8::MAX)
+ {
+ return Ok(n as u8);
}
Err(Details::GetU8(int).into())
diff --git a/avro/tests/shared.rs b/avro/tests/shared.rs
index f23c837..b240983 100644
--- a/avro/tests/shared.rs
+++ b/avro/tests/shared.rs
@@ -17,6 +17,7 @@
use apache_avro::{Codec, Reader, Schema, Writer, types::Value};
use apache_avro_test_helper::TestResult;
+use std::path::PathBuf;
use std::{
fmt,
fs::{DirEntry, File, ReadDir},
@@ -43,17 +44,16 @@ fn test_schema() -> TestResult {
Err(e) => core::panic!("Can't get file {}", e),
};
log::debug!("{:?}", entry.file_name());
- if let Ok(ft) = entry.file_type() {
- if ft.is_dir() {
- let sub_folder =
- ROOT_DIRECTORY.to_owned() + "/" +
entry.file_name().to_str().unwrap();
-
- let dir_result = test_folder(sub_folder.as_str());
- if let Err(ed) = dir_result {
- result = match result {
- Ok(()) => Err(ed),
- Err(e) => Err(e.merge(&ed)),
- }
+ if let Ok(ft) = entry.file_type()
+ && ft.is_dir()
+ {
+ let sub_folder =
PathBuf::from(ROOT_DIRECTORY).join(entry.file_name());
+
+ let dir_result = test_folder(&sub_folder);
+ if let Err(ed) = dir_result {
+ result = match result {
+ Ok(()) => Err(ed),
+ Err(e) => Err(e.merge(&ed)),
}
}
}
@@ -96,19 +96,18 @@ impl fmt::Display for ErrorsDesc {
}
}
-fn test_folder(folder: &str) -> Result<(), ErrorsDesc> {
- let file_name = folder.to_owned() + "/schema.json";
+fn test_folder(folder: &Path) -> Result<(), ErrorsDesc> {
+ let file_name = folder.join("schema.json");
let content = std::fs::read_to_string(file_name).expect("Unable to find
schema.json file");
let schema: Schema = Schema::parse_str(content.as_str()).expect("Can't
read schema");
- let data_file_name = folder.to_owned() + "/data.avro";
- let data_path: &Path = Path::new(data_file_name.as_str());
+ let data_path = folder.join("data.avro");
let mut result = Ok(());
if !data_path.exists() {
- log::error!("folder {folder} does not exist");
+ log::error!("folder {folder:?} does not exist");
return Err(ErrorsDesc::new(
- format!("folder {folder} does not exist").as_str(),
+ format!("folder {folder:?} does not exist").as_str(),
));
} else {
let file: File = File::open(data_path).expect("Can't open data.avro");
@@ -137,11 +136,11 @@ fn test_folder(folder: &str) -> Result<(), ErrorsDesc> {
if original != &record {
result = match result {
Ok(_) => Result::Err(ErrorsDesc::new(
- format!("Records are not equals for folder :
{folder}").as_str(),
+ format!("Records are not equals for folder :
{folder:?}").as_str(),
)),
- Err(e) => {
- Err(e.add(format!("Records are not equals for folder :
{folder}").as_str()))
- }
+ Err(e) => Err(
+ e.add(format!("Records are not equals for folder :
{folder:?}").as_str())
+ ),
}
}
}