This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 5dd34630c7 Add arrow-avro `SchemaStore` and fingerprinting (#8039)
5dd34630c7 is described below
commit 5dd34630c742f3cf78f539245a6fbfdd92dde891
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Aug 5 14:27:50 2025 -0500
Add arrow-avro `SchemaStore` and fingerprinting (#8039)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/4886
- Pre-work for https://github.com/apache/arrow-rs/pull/8006
# Rationale for this change
Apache Avro’s [single object
encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding)
prefixes every record with the marker `0xC3 0x01` followed by a `Rabin`
[schema fingerprint
](https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints)
so that readers can identify the correct writer schema without carrying
the full definition in each message.
While the current `arrow‑avro` implementation can read container files,
it cannot ingest these framed messages or handle streams where the
writer schema changes over time.
The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin)
hashed fingerprint of the [parsed canonical form of a
schema](https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas)
to look up the `Schema` from a local schema store or registry.
This PR introduces **`SchemaStore`** and **fingerprinting** to enable:
* **Zero‑copy schema identification** for decoding streaming Avro
messages published in single‑object format (i.e. Kafka, Pulsar, etc)
into Arrow.
* **Dynamic schema evolution** by laying the foundation to resolve
writer reader schema differences on the fly.
**NOTE:** Integration with `Decoder` and `Reader` coming in next PR.
# What changes are included in this PR?
| Area | Highlights |
| ------------------- |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| **`schema.rs`** | *New* `Fingerprint`, `SchemaStore`, and
`SINGLE_OBJECT_MAGIC`; canonical‑form generator; Rabin fingerprint
calculator; `compare_schemas` helper. |
| **`lib.rs`** | `mod schema` is now `pub` |
| **Unit tests** | New tests covering fingerprint generation, store
registration/lookup, unknown‑fingerprint errors, and interaction with
UTF8‑view decoding. |
| **Docs & Examples** | Extensive inline docs with examples on all new
public methods / structs. |
# Are these changes tested?
Yes. New tests cover:
1. **Fingerprinting** against the canonical examples from the Avro spec
2. **`SchemaStore` behavior** deduplication, duplicate registration, and
lookup.
# Are there any user-facing changes?
N/A
---
arrow-avro/Cargo.toml | 1 +
arrow-avro/src/lib.rs | 8 +-
arrow-avro/src/schema.rs | 560 ++++++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 564 insertions(+), 5 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index e2280b251f..8db404923c 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -56,6 +56,7 @@ bzip2 = { version = "0.6.0", optional = true }
xz = { version = "0.1", default-features = false, optional = true }
crc = { version = "3.0", optional = true }
uuid = "1.17"
+strum_macros = "0.27"
[dev-dependencies]
arrow-data = { workspace = true }
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index ae13c38618..8087a908d6 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -33,10 +33,10 @@
/// Implements the primary reader interface and record decoding logic.
pub mod reader;
-// Avro schema parsing and representation
-//
-// Provides types for parsing and representing Avro schema definitions.
-mod schema;
+/// Avro schema parsing and representation
+///
+/// Provides types for parsing and representing Avro schema definitions.
+pub mod schema;
/// Compression codec implementations for Avro
///
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index c3e4549c8c..539e7b02f3 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -15,12 +15,28 @@
// specific language governing permissions and limitations
// under the License.
+use arrow_schema::ArrowError;
use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
+use std::cmp::PartialEq;
+use std::collections::hash_map::Entry;
use std::collections::HashMap;
+use strum_macros::AsRefStr;
/// The metadata key used for storing the JSON encoded [`Schema`]
pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
+/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
+pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
+
+/// Compare two Avro schemas for equality (identical schemas).
+/// Returns true if the schemas have the same parsing canonical form (i.e.,
logically identical).
+pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result<bool,
ArrowError> {
+ let canon_writer = generate_canonical_form(writer)?;
+ let canon_reader = generate_canonical_form(reader)?;
+ Ok(canon_writer == canon_reader)
+}
+
/// Either a [`PrimitiveType`] or a reference to a previously defined named
type
///
/// <https://avro.apache.org/docs/1.11.1/specification/#names>
@@ -39,8 +55,9 @@ pub enum TypeName<'a> {
/// A primitive type
///
/// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, AsRefStr)]
#[serde(rename_all = "camelCase")]
+#[strum(serialize_all = "lowercase")]
pub enum PrimitiveType {
/// null: no value
Null,
@@ -260,6 +277,376 @@ pub struct Fixed<'a> {
pub attributes: Attributes<'a>,
}
+/// A wrapper for an Avro schema in its JSON string representation.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct AvroSchema {
+ /// The Avro schema as a JSON string.
+ pub json_string: String,
+}
+
+impl AvroSchema {
+ /// Creates a new `AvroSchema` from a JSON string.
+ pub fn new(json_string: String) -> Self {
+ Self { json_string }
+ }
+
+ /// Deserializes and returns the `AvroSchema`.
+ ///
+ /// The returned schema borrows from `self`.
+ pub fn schema(&self) -> Result<Schema<'_>, ArrowError> {
+ serde_json::from_str(self.json_string.as_str())
+ .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema
JSON: {e}")))
+ }
+
+ /// Returns the Rabin fingerprint of the schema.
+ pub fn fingerprint(&self) -> Result<Fingerprint, ArrowError> {
+ generate_fingerprint_rabin(&self.schema()?)
+ }
+}
+
+/// Supported fingerprint algorithms for Avro schema identification.
+/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come
in a future update
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
+pub enum FingerprintAlgorithm {
+ /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
+ #[default]
+ Rabin,
+}
+
+/// A schema fingerprint in one of the supported formats.
+///
+/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
+/// instance always stores only one variant, matching its configured
+/// `FingerprintAlgorithm`, but the enum makes the API uniform.
+/// Currently only `Rabin` is supported
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Fingerprint {
+ /// A 64-bit Rabin fingerprint.
+ Rabin(u64),
+}
+
+/// Allow easy extraction of the algorithm used to create a fingerprint.
+impl From<&Fingerprint> for FingerprintAlgorithm {
+ fn from(fp: &Fingerprint) -> Self {
+ match fp {
+ Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
+ }
+ }
+}
+
+/// Generates a fingerprint for the given `Schema` using the specified
`FingerprintAlgorithm`.
+pub(crate) fn generate_fingerprint(
+ schema: &Schema,
+ hash_type: FingerprintAlgorithm,
+) -> Result<Fingerprint, ArrowError> {
+ let canonical = generate_canonical_form(schema).map_err(|e| {
+ ArrowError::ComputeError(format!("Failed to generate canonical form
for schema: {e}"))
+ })?;
+ match hash_type {
+ FingerprintAlgorithm::Rabin => {
+ Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
+ }
+ }
+}
+
+/// Generates the 64-bit Rabin fingerprint for the given `Schema`.
+///
+/// The fingerprint is computed from the canonical form of the schema.
+/// This is also known as `CRC-64-AVRO`.
+///
+/// # Returns
+/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
+pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint,
ArrowError> {
+ generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
+}
+
+/// Generates the Parsed Canonical Form for the given [`Schema`].
+///
+/// The canonical form is a standardized JSON representation of the schema,
+/// primarily used for generating a schema fingerprint for equality checking.
+///
+/// This form strips attributes that do not affect the schema's identity,
+/// such as `doc` fields, `aliases`, and any properties not defined in the
+/// Avro specification.
+///
+///
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
+pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
+ build_canonical(schema, None)
+}
+
+/// An in-memory cache of Avro schemas, indexed by their fingerprint.
+///
+/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas
efficiently.
+/// Each schema is associated with a unique [`Fingerprint`], which is
generated based
+/// on the schema's canonical form and a specific hashing algorithm.
+///
+/// A `SchemaStore` instance is configured to use a single
[`FingerprintAlgorithm`] such as Rabin,
+/// MD5 (not yet supported), or SHA256 (not yet supported) for all its
operations.
+/// This ensures consistency when generating fingerprints and looking up
schemas.
+/// All schemas registered will have their fingerprint computed with this
algorithm, and
+/// lookups must use a matching fingerprint.
+///
+/// # Examples
+///
+/// ```no_run
+/// // Create a new store with the default Rabin fingerprinting.
+/// use arrow_avro::schema::{AvroSchema, SchemaStore};
+///
+/// let mut store = SchemaStore::new();
+/// let schema = AvroSchema::new("\"string\"".to_string());
+/// // Register the schema to get its fingerprint.
+/// let fingerprint = store.register(schema.clone()).unwrap();
+/// // Use the fingerprint to look up the schema.
+/// let retrieved_schema = store.lookup(&fingerprint).cloned();
+/// assert_eq!(retrieved_schema, Some(schema));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct SchemaStore {
+ /// The hashing algorithm used for generating fingerprints.
+ fingerprint_algorithm: FingerprintAlgorithm,
+ /// A map from a schema's fingerprint to the schema itself.
+ schemas: HashMap<Fingerprint, AvroSchema>,
+}
+
+impl TryFrom<&[AvroSchema]> for SchemaStore {
+ type Error = ArrowError;
+
+ /// Creates a `SchemaStore` from a slice of schemas.
+ /// Each schema in the slice is registered with the new store.
+ fn try_from(schemas: &[AvroSchema]) -> Result<Self, Self::Error> {
+ let mut store = SchemaStore::new();
+ for schema in schemas {
+ store.register(schema.clone())?;
+ }
+ Ok(store)
+ }
+}
+
+impl SchemaStore {
+ /// Creates an empty `SchemaStore` using the default fingerprinting
algorithm (64-bit Rabin).
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Registers a schema with the store and returns its fingerprint.
+ ///
+ /// A fingerprint is calculated for the given schema using the store's
configured
+ /// hash type. If a schema with the same fingerprint does not already
exist in the
+ /// store, the new schema is inserted. If the fingerprint already exists,
the
+ /// existing schema is not overwritten.
+ ///
+ /// # Arguments
+ ///
+ /// * `schema` - The `AvroSchema` to register.
+ ///
+ /// # Returns
+ ///
+ /// A `Result` containing the `Fingerprint` of the schema if successful,
+ /// or an `ArrowError` on failure.
+ pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint,
ArrowError> {
+ let fingerprint = generate_fingerprint(&schema.schema()?,
self.fingerprint_algorithm)?;
+ match self.schemas.entry(fingerprint) {
+ Entry::Occupied(entry) => {
+ if entry.get() != &schema {
+ return Err(ArrowError::ComputeError(format!(
+ "Schema fingerprint collision detected for fingerprint
{fingerprint:?}"
+ )));
+ }
+ }
+ Entry::Vacant(entry) => {
+ entry.insert(schema);
+ }
+ }
+ Ok(fingerprint)
+ }
+
+ /// Looks up a schema by its `Fingerprint`.
+ ///
+ /// # Arguments
+ ///
+ /// * `fingerprint` - A reference to the `Fingerprint` of the schema to
look up.
+ ///
+ /// # Returns
+ ///
+ /// An `Option` containing a clone of the `AvroSchema` if found, otherwise
`None`.
+ pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
+ self.schemas.get(fingerprint)
+ }
+
+ /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
+ /// held by this [`SchemaStore`].
+ ///
+ /// The order of the returned fingerprints is unspecified and should not be
+ /// relied upon.
+ pub fn fingerprints(&self) -> Vec<Fingerprint> {
+ self.schemas.keys().copied().collect()
+ }
+
+ /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for
fingerprinting.
+ pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
+ self.fingerprint_algorithm
+ }
+}
+
+fn quote(s: &str) -> Result<String, ArrowError> {
+ serde_json::to_string(s)
+ .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string:
{e}")))
+}
+
+// Avro names are defined by a `name` and an optional `namespace`.
+// The full name is composed of the namespace and the name, separated by a dot.
+//
+// Avro specification defines two ways to specify a full name:
+// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
+// In this case, the `namespace` attribute is ignored.
+// 2. The `name` attribute contains the simple name (e.g., "d") and the
+// `namespace` attribute contains the namespace (e.g., "a.b.c").
+//
+// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
+// Complex paths with quotes or backticks like `a."hi".b` are not supported.
+//
+// This function constructs the full name and extracts the namespace,
+// handling both ways of specifying the name. It prioritizes a namespace
+// defined within the `name` attribute itself, then the explicit
`namespace_attr`,
+// and finally the `enclosing_ns`.
+fn make_full_name(
+ name: &str,
+ namespace_attr: Option<&str>,
+ enclosing_ns: Option<&str>,
+) -> (String, Option<String>) {
+ // `name` already contains a dot then treat as full-name, ignore namespace.
+ if let Some((ns, _)) = name.rsplit_once('.') {
+ return (name.to_string(), Some(ns.to_string()));
+ }
+ match namespace_attr.or(enclosing_ns) {
+ Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
+ None => (name.to_string(), None),
+ }
+}
+
+fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) ->
Result<String, ArrowError> {
+ Ok(match schema {
+ Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match
tn {
+ TypeName::Primitive(pt) => quote(pt.as_ref())?,
+ TypeName::Ref(name) => {
+ let (full_name, _) = make_full_name(name, None, enclosing_ns);
+ quote(&full_name)?
+ }
+ },
+ Schema::Union(branches) => format!(
+ "[{}]",
+ branches
+ .iter()
+ .map(|b| build_canonical(b, enclosing_ns))
+ .collect::<Result<Vec<_>, _>>()?
+ .join(",")
+ ),
+ Schema::Complex(ct) => match ct {
+ ComplexType::Record(r) => {
+ let (full_name, child_ns) = make_full_name(r.name,
r.namespace, enclosing_ns);
+ let fields = r
+ .fields
+ .iter()
+ .map(|f| {
+ let field_type =
+ build_canonical(&f.r#type,
child_ns.as_deref().or(enclosing_ns))?;
+ Ok(format!(
+ r#"{{"name":{},"type":{}}}"#,
+ quote(f.name)?,
+ field_type
+ ))
+ })
+ .collect::<Result<Vec<_>, ArrowError>>()?
+ .join(",");
+ format!(
+ r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
+ quote(&full_name)?,
+ )
+ }
+ ComplexType::Enum(e) => {
+ let (full_name, _) = make_full_name(e.name, e.namespace,
enclosing_ns);
+ let symbols = e
+ .symbols
+ .iter()
+ .map(|s| quote(s))
+ .collect::<Result<Vec<_>, _>>()?
+ .join(",");
+ format!(
+ r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
+ quote(&full_name)?
+ )
+ }
+ ComplexType::Array(arr) => format!(
+ r#"{{"type":"array","items":{}}}"#,
+ build_canonical(&arr.items, enclosing_ns)?
+ ),
+ ComplexType::Map(map) => format!(
+ r#"{{"type":"map","values":{}}}"#,
+ build_canonical(&map.values, enclosing_ns)?
+ ),
+ ComplexType::Fixed(f) => {
+ let (full_name, _) = make_full_name(f.name, f.namespace,
enclosing_ns);
+ format!(
+ r#"{{"name":{},"type":"fixed","size":{}}}"#,
+ quote(&full_name)?,
+ f.size
+ )
+ }
+ },
+ })
+}
+
+/// 64‑bit Rabin fingerprint as described in the Avro spec.
+const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
+
+/// Build one entry of the polynomial‑division table.
+///
+/// We cannot yet write `for _ in 0..8` here: `for` loops rely on
+/// `Iterator::next`, which is not `const` on stable Rust. Until the
+/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
+/// loop is the only option in a `const fn`
+const fn one_entry(i: usize) -> u64 {
+ let mut fp = i as u64;
+ let mut j = 0;
+ while j < 8 {
+ fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
+ j += 1;
+ }
+ fp
+}
+
+/// Build the full 256‑entry table at compile time.
+///
+/// We cannot yet write `for _ in 0..256` here: `for` loops rely on
+/// `Iterator::next`, which is not `const` on stable Rust. Until the
+/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
+/// loop is the only option in a `const fn`
+const fn build_table() -> [u64; 256] {
+ let mut table = [0u64; 256];
+ let mut i = 0;
+ while i < 256 {
+ table[i] = one_entry(i);
+ i += 1;
+ }
+ table
+}
+
+/// The pre‑computed table.
+static FINGERPRINT_TABLE: [u64; 256] = build_table();
+
+/// Computes the 64-bit Rabin fingerprint for a given canonical schema string.
+/// This implementation is based on the Avro specification for schema
fingerprinting.
+pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
+ let mut fp = EMPTY;
+ for &byte in canonical_form.as_bytes() {
+ let idx = ((fp as u8) ^ byte) as usize;
+ fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
+ }
+ fp
+}
+
#[cfg(test)]
mod tests {
use super::*;
@@ -267,6 +654,34 @@ mod tests {
use arrow_schema::{DataType, Fields, TimeUnit};
use serde_json::json;
+ fn int_schema() -> Schema<'static> {
+ Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
+ }
+
+ fn record_schema() -> Schema<'static> {
+ Schema::Complex(ComplexType::Record(Record {
+ name: "record1",
+ namespace: Some("test.namespace"),
+ doc: Some("A test record"),
+ aliases: vec![],
+ fields: vec![
+ Field {
+ name: "field1",
+ doc: Some("An integer field"),
+ r#type: int_schema(),
+ default: None,
+ },
+ Field {
+ name: "field2",
+ doc: None,
+ r#type:
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+ default: None,
+ },
+ ],
+ attributes: Attributes::default(),
+ }))
+ }
+
#[test]
fn test_deserialize() {
let t: Schema = serde_json::from_str("\"string\"").unwrap();
@@ -562,4 +977,147 @@ mod tests {
}))
);
}
+
+ #[test]
+ fn test_new_schema_store() {
+ let store = SchemaStore::new();
+ assert!(store.schemas.is_empty());
+ }
+
+ #[test]
+ fn test_try_from_schemas_rabin() {
+ let int_avro_schema =
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+ let record_avro_schema =
AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
+ let schemas = vec![int_avro_schema.clone(),
record_avro_schema.clone()];
+ let store = SchemaStore::try_from(schemas.as_slice()).unwrap();
+ let int_fp = int_avro_schema.fingerprint().unwrap();
+ assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
+ let rec_fp = record_avro_schema.fingerprint().unwrap();
+ assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
+ }
+
+ #[test]
+ fn test_try_from_with_duplicates() {
+ let int_avro_schema =
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+ let record_avro_schema =
AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
+ let schemas = vec![
+ int_avro_schema.clone(),
+ record_avro_schema,
+ int_avro_schema.clone(),
+ ];
+ let store = SchemaStore::try_from(schemas.as_slice()).unwrap();
+ assert_eq!(store.schemas.len(), 2);
+ let int_fp = int_avro_schema.fingerprint().unwrap();
+ assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
+ }
+
+ #[test]
+ fn test_register_and_lookup_rabin() {
+ let mut store = SchemaStore::new();
+ let schema =
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+ let fp_enum = store.register(schema.clone()).unwrap();
+ let Fingerprint::Rabin(fp_val) = fp_enum;
+ assert_eq!(
+ store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
+ Some(schema.clone())
+ );
+ assert!(store
+ .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
+ .is_none());
+ }
+
+ #[test]
+ fn test_register_duplicate_schema() {
+ let mut store = SchemaStore::new();
+ let schema1 =
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+ let schema2 =
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+ let fingerprint1 = store.register(schema1).unwrap();
+ let fingerprint2 = store.register(schema2).unwrap();
+ assert_eq!(fingerprint1, fingerprint2);
+ assert_eq!(store.schemas.len(), 1);
+ }
+
+ #[test]
+ fn test_canonical_form_generation_primitive() {
+ let schema = int_schema();
+ let canonical_form = generate_canonical_form(&schema).unwrap();
+ assert_eq!(canonical_form, r#""int""#);
+ }
+
+ #[test]
+ fn test_canonical_form_generation_record() {
+ let schema = record_schema();
+ let expected_canonical_form =
r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
+ let canonical_form = generate_canonical_form(&schema).unwrap();
+ assert_eq!(canonical_form, expected_canonical_form);
+ }
+
+ #[test]
+ fn test_fingerprint_calculation() {
+ let canonical_form =
r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
+ let expected_fingerprint = 10505236152925314060;
+ let fingerprint = compute_fingerprint_rabin(canonical_form);
+ assert_eq!(fingerprint, expected_fingerprint);
+ }
+
+ #[test]
+ fn test_register_and_lookup_complex_schema() {
+ let mut store = SchemaStore::new();
+ let schema =
AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
+ let canonical_form =
r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
+ let expected_fingerprint =
+
Fingerprint::Rabin(super::compute_fingerprint_rabin(canonical_form));
+ let fingerprint = store.register(schema.clone()).unwrap();
+ assert_eq!(fingerprint, expected_fingerprint);
+ let looked_up = store.lookup(&fingerprint).cloned();
+ assert_eq!(looked_up, Some(schema));
+ }
+
+ #[test]
+ fn test_fingerprints_returns_all_keys() {
+ let mut store = SchemaStore::new();
+ let fp_int = store
+ .register(AvroSchema::new(
+ serde_json::to_string(&int_schema()).unwrap(),
+ ))
+ .unwrap();
+ let fp_record = store
+ .register(AvroSchema::new(
+ serde_json::to_string(&record_schema()).unwrap(),
+ ))
+ .unwrap();
+ let fps = store.fingerprints();
+ assert_eq!(fps.len(), 2);
+ assert!(fps.contains(&fp_int));
+ assert!(fps.contains(&fp_record));
+ }
+
+ #[test]
+ fn test_canonical_form_strips_attributes() {
+ let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
+ name: "record_with_attrs",
+ namespace: None,
+ doc: Some("This doc should be stripped"),
+ aliases: vec!["alias1", "alias2"],
+ fields: vec![Field {
+ name: "f1",
+ doc: Some("field doc"),
+ r#type: Schema::Type(Type {
+ r#type: TypeName::Primitive(PrimitiveType::Bytes),
+ attributes: Attributes {
+ logical_type: Some("decimal"),
+ additional: HashMap::from([("precision", json!(4))]),
+ },
+ }),
+ default: None,
+ }],
+ attributes: Attributes {
+ logical_type: None,
+ additional: HashMap::from([("custom_attr", json!("value"))]),
+ },
+ }));
+ let expected_canonical_form =
r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
+ let canonical_form =
generate_canonical_form(&schema_with_attrs).unwrap();
+ assert_eq!(canonical_form, expected_canonical_form);
+ }
}