This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dd34630c7 Add arrow-avro `SchemaStore` and fingerprinting (#8039)
5dd34630c7 is described below

commit 5dd34630c742f3cf78f539245a6fbfdd92dde891
Author: Connor Sanders <[email protected]>
AuthorDate: Tue Aug 5 14:27:50 2025 -0500

    Add arrow-avro `SchemaStore` and fingerprinting (#8039)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/4886
    
    - Pre-work for https://github.com/apache/arrow-rs/pull/8006
    
    # Rationale for this change
    
    Apache Avro’s [single object
    
encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding)
    prefixes every record with the marker `0xC3 0x01` followed by a `Rabin`
    [schema fingerprint
    ](https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints)
    so that readers can identify the correct writer schema without carrying
    the full definition in each message.
    While the current `arrow‑avro` implementation can read container files,
    it cannot ingest these framed messages or handle streams where the
    writer schema changes over time.
    
    The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin)
    hashed fingerprint of the [parsed canonical form of a
    
schema](https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas)
    to look up the `Schema` from a local schema store or registry.
    
    This PR introduces **`SchemaStore`** and **fingerprinting** to enable:
    
    * **Zero‑copy schema identification** for decoding streaming Avro
    messages published in single‑object format (i.e. Kafka, Pulsar, etc)
    into Arrow.
    * **Dynamic schema evolution** by laying the foundation to resolve
    writer reader schema differences on the fly.
    
    **NOTE:**  Integration with `Decoder` and `Reader` coming in next PR.
    
    # What changes are included in this PR?
    
    | Area | Highlights |
    | ------------------- |
    
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    |
    | **`schema.rs`** | *New* `Fingerprint`, `SchemaStore`, and
    `SINGLE_OBJECT_MAGIC`; canonical‑form generator; Rabin fingerprint
    calculator; `compare_schemas` helper. |
    | **`lib.rs`** | `mod schema` is now `pub` |
    | **Unit tests** | New tests covering fingerprint generation, store
    registration/lookup, unknown‑fingerprint errors, and interaction with
    UTF8‑view decoding. |
    | **Docs & Examples** | Extensive inline docs with examples on all new
    public methods / structs. |
    
    
    # Are these changes tested?
    
    Yes.  New tests cover:
    
    1. **Fingerprinting** against the canonical examples from the Avro spec
    2. **`SchemaStore` behavior** deduplication, duplicate registration, and
    lookup.
    
    # Are there any user-facing changes?
    
    N/A
---
 arrow-avro/Cargo.toml    |   1 +
 arrow-avro/src/lib.rs    |   8 +-
 arrow-avro/src/schema.rs | 560 ++++++++++++++++++++++++++++++++++++++++++++++-
 3 files changed, 564 insertions(+), 5 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index e2280b251f..8db404923c 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -56,6 +56,7 @@ bzip2 = { version = "0.6.0", optional = true }
 xz = { version = "0.1", default-features = false, optional = true }
 crc = { version = "3.0", optional = true }
 uuid = "1.17"
+strum_macros = "0.27"
 
 [dev-dependencies]
 arrow-data = { workspace = true }
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index ae13c38618..8087a908d6 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -33,10 +33,10 @@
 /// Implements the primary reader interface and record decoding logic.
 pub mod reader;
 
-// Avro schema parsing and representation
-//
-// Provides types for parsing and representing Avro schema definitions.
-mod schema;
+/// Avro schema parsing and representation
+///
+/// Provides types for parsing and representing Avro schema definitions.
+pub mod schema;
 
 /// Compression codec implementations for Avro
 ///
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index c3e4549c8c..539e7b02f3 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -15,12 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use arrow_schema::ArrowError;
 use serde::{Deserialize, Serialize};
+use serde_json::{json, Value};
+use std::cmp::PartialEq;
+use std::collections::hash_map::Entry;
 use std::collections::HashMap;
+use strum_macros::AsRefStr;
 
 /// The metadata key used for storing the JSON encoded [`Schema`]
 pub const SCHEMA_METADATA_KEY: &str = "avro.schema";
 
+/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`)
+pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01];
+
+/// Compare two Avro schemas for equality (identical schemas).
+/// Returns true if the schemas have the same parsing canonical form (i.e., 
logically identical).
+pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result<bool, 
ArrowError> {
+    let canon_writer = generate_canonical_form(writer)?;
+    let canon_reader = generate_canonical_form(reader)?;
+    Ok(canon_writer == canon_reader)
+}
+
 /// Either a [`PrimitiveType`] or a reference to a previously defined named 
type
 ///
 /// <https://avro.apache.org/docs/1.11.1/specification/#names>
@@ -39,8 +55,9 @@ pub enum TypeName<'a> {
 /// A primitive type
 ///
 /// <https://avro.apache.org/docs/1.11.1/specification/#primitive-types>
-#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, AsRefStr)]
 #[serde(rename_all = "camelCase")]
+#[strum(serialize_all = "lowercase")]
 pub enum PrimitiveType {
     /// null: no value
     Null,
@@ -260,6 +277,376 @@ pub struct Fixed<'a> {
     pub attributes: Attributes<'a>,
 }
 
+/// A wrapper for an Avro schema in its JSON string representation.
+#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
+pub struct AvroSchema {
+    /// The Avro schema as a JSON string.
+    pub json_string: String,
+}
+
+impl AvroSchema {
+    /// Creates a new `AvroSchema` from a JSON string.
+    pub fn new(json_string: String) -> Self {
+        Self { json_string }
+    }
+
+    /// Deserializes and returns the `AvroSchema`.
+    ///
+    /// The returned schema borrows from `self`.
+    pub fn schema(&self) -> Result<Schema<'_>, ArrowError> {
+        serde_json::from_str(self.json_string.as_str())
+            .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema 
JSON: {e}")))
+    }
+
+    /// Returns the Rabin fingerprint of the schema.
+    pub fn fingerprint(&self) -> Result<Fingerprint, ArrowError> {
+        generate_fingerprint_rabin(&self.schema()?)
+    }
+}
+
+/// Supported fingerprint algorithms for Avro schema identification.
+/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come 
in a future update
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)]
+pub enum FingerprintAlgorithm {
+    /// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
+    #[default]
+    Rabin,
+}
+
+/// A schema fingerprint in one of the supported formats.
+///
+/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore`
+/// instance always stores only one variant, matching its configured
+/// `FingerprintAlgorithm`, but the enum makes the API uniform.
+/// Currently only `Rabin` is supported
+///
+/// <https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
+#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
+pub enum Fingerprint {
+    /// A 64-bit Rabin fingerprint.
+    Rabin(u64),
+}
+
+/// Allow easy extraction of the algorithm used to create a fingerprint.
+impl From<&Fingerprint> for FingerprintAlgorithm {
+    fn from(fp: &Fingerprint) -> Self {
+        match fp {
+            Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
+        }
+    }
+}
+
+/// Generates a fingerprint for the given `Schema` using the specified 
`FingerprintAlgorithm`.
+pub(crate) fn generate_fingerprint(
+    schema: &Schema,
+    hash_type: FingerprintAlgorithm,
+) -> Result<Fingerprint, ArrowError> {
+    let canonical = generate_canonical_form(schema).map_err(|e| {
+        ArrowError::ComputeError(format!("Failed to generate canonical form 
for schema: {e}"))
+    })?;
+    match hash_type {
+        FingerprintAlgorithm::Rabin => {
+            Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
+        }
+    }
+}
+
+/// Generates the 64-bit Rabin fingerprint for the given `Schema`.
+///
+/// The fingerprint is computed from the canonical form of the schema.
+/// This is also known as `CRC-64-AVRO`.
+///
+/// # Returns
+/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint.
+pub fn generate_fingerprint_rabin(schema: &Schema) -> Result<Fingerprint, 
ArrowError> {
+    generate_fingerprint(schema, FingerprintAlgorithm::Rabin)
+}
+
+/// Generates the Parsed Canonical Form for the given [`Schema`].
+///
+/// The canonical form is a standardized JSON representation of the schema,
+/// primarily used for generating a schema fingerprint for equality checking.
+///
+/// This form strips attributes that do not affect the schema's identity,
+/// such as `doc` fields, `aliases`, and any properties not defined in the
+/// Avro specification.
+///
+/// 
<https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas>
+pub fn generate_canonical_form(schema: &Schema) -> Result<String, ArrowError> {
+    build_canonical(schema, None)
+}
+
+/// An in-memory cache of Avro schemas, indexed by their fingerprint.
+///
+/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas 
efficiently.
+/// Each schema is associated with a unique [`Fingerprint`], which is 
generated based
+/// on the schema's canonical form and a specific hashing algorithm.
+///
+/// A `SchemaStore` instance is configured to use a single 
[`FingerprintAlgorithm`] such as Rabin,
+/// MD5 (not yet supported), or SHA256 (not yet supported) for all its 
operations.
+/// This ensures consistency when generating fingerprints and looking up 
schemas.
+/// All schemas registered will have their fingerprint computed with this 
algorithm, and
+/// lookups must use a matching fingerprint.
+///
+/// # Examples
+///
+/// ```no_run
+/// // Create a new store with the default Rabin fingerprinting.
+/// use arrow_avro::schema::{AvroSchema, SchemaStore};
+///
+/// let mut store = SchemaStore::new();
+/// let schema = AvroSchema::new("\"string\"".to_string());
+/// // Register the schema to get its fingerprint.
+/// let fingerprint = store.register(schema.clone()).unwrap();
+/// // Use the fingerprint to look up the schema.
+/// let retrieved_schema = store.lookup(&fingerprint).cloned();
+/// assert_eq!(retrieved_schema, Some(schema));
+/// ```
+#[derive(Debug, Clone, Default)]
+pub struct SchemaStore {
+    /// The hashing algorithm used for generating fingerprints.
+    fingerprint_algorithm: FingerprintAlgorithm,
+    /// A map from a schema's fingerprint to the schema itself.
+    schemas: HashMap<Fingerprint, AvroSchema>,
+}
+
+impl TryFrom<&[AvroSchema]> for SchemaStore {
+    type Error = ArrowError;
+
+    /// Creates a `SchemaStore` from a slice of schemas.
+    /// Each schema in the slice is registered with the new store.
+    fn try_from(schemas: &[AvroSchema]) -> Result<Self, Self::Error> {
+        let mut store = SchemaStore::new();
+        for schema in schemas {
+            store.register(schema.clone())?;
+        }
+        Ok(store)
+    }
+}
+
+impl SchemaStore {
+    /// Creates an empty `SchemaStore` using the default fingerprinting 
algorithm (64-bit Rabin).
+    pub fn new() -> Self {
+        Self::default()
+    }
+
+    /// Registers a schema with the store and returns its fingerprint.
+    ///
+    /// A fingerprint is calculated for the given schema using the store's 
configured
+    /// hash type. If a schema with the same fingerprint does not already 
exist in the
+    /// store, the new schema is inserted. If the fingerprint already exists, 
the
+    /// existing schema is not overwritten.
+    ///
+    /// # Arguments
+    ///
+    /// * `schema` - The `AvroSchema` to register.
+    ///
+    /// # Returns
+    ///
+    /// A `Result` containing the `Fingerprint` of the schema if successful,
+    /// or an `ArrowError` on failure.
+    pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint, 
ArrowError> {
+        let fingerprint = generate_fingerprint(&schema.schema()?, 
self.fingerprint_algorithm)?;
+        match self.schemas.entry(fingerprint) {
+            Entry::Occupied(entry) => {
+                if entry.get() != &schema {
+                    return Err(ArrowError::ComputeError(format!(
+                        "Schema fingerprint collision detected for fingerprint 
{fingerprint:?}"
+                    )));
+                }
+            }
+            Entry::Vacant(entry) => {
+                entry.insert(schema);
+            }
+        }
+        Ok(fingerprint)
+    }
+
+    /// Looks up a schema by its `Fingerprint`.
+    ///
+    /// # Arguments
+    ///
+    /// * `fingerprint` - A reference to the `Fingerprint` of the schema to 
look up.
+    ///
+    /// # Returns
+    ///
+    /// An `Option` containing a clone of the `AvroSchema` if found, otherwise 
`None`.
+    pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> {
+        self.schemas.get(fingerprint)
+    }
+
+    /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently
+    /// held by this [`SchemaStore`].
+    ///
+    /// The order of the returned fingerprints is unspecified and should not be
+    /// relied upon.
+    pub fn fingerprints(&self) -> Vec<Fingerprint> {
+        self.schemas.keys().copied().collect()
+    }
+
+    /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for 
fingerprinting.
+    pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm {
+        self.fingerprint_algorithm
+    }
+}
+
+fn quote(s: &str) -> Result<String, ArrowError> {
+    serde_json::to_string(s)
+        .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: 
{e}")))
+}
+
+// Avro names are defined by a `name` and an optional `namespace`.
+// The full name is composed of the namespace and the name, separated by a dot.
+//
+// Avro specification defines two ways to specify a full name:
+// 1. The `name` attribute contains the full name (e.g., "a.b.c.d").
+//    In this case, the `namespace` attribute is ignored.
+// 2. The `name` attribute contains the simple name (e.g., "d") and the
+//    `namespace` attribute contains the namespace (e.g., "a.b.c").
+//
+// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`.
+// Complex paths with quotes or backticks like `a."hi".b` are not supported.
+//
+// This function constructs the full name and extracts the namespace,
+// handling both ways of specifying the name. It prioritizes a namespace
+// defined within the `name` attribute itself, then the explicit 
`namespace_attr`,
+// and finally the `enclosing_ns`.
+fn make_full_name(
+    name: &str,
+    namespace_attr: Option<&str>,
+    enclosing_ns: Option<&str>,
+) -> (String, Option<String>) {
+    // `name` already contains a dot then treat as full-name, ignore namespace.
+    if let Some((ns, _)) = name.rsplit_once('.') {
+        return (name.to_string(), Some(ns.to_string()));
+    }
+    match namespace_attr.or(enclosing_ns) {
+        Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())),
+        None => (name.to_string(), None),
+    }
+}
+
+fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> 
Result<String, ArrowError> {
+    Ok(match schema {
+        Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match 
tn {
+            TypeName::Primitive(pt) => quote(pt.as_ref())?,
+            TypeName::Ref(name) => {
+                let (full_name, _) = make_full_name(name, None, enclosing_ns);
+                quote(&full_name)?
+            }
+        },
+        Schema::Union(branches) => format!(
+            "[{}]",
+            branches
+                .iter()
+                .map(|b| build_canonical(b, enclosing_ns))
+                .collect::<Result<Vec<_>, _>>()?
+                .join(",")
+        ),
+        Schema::Complex(ct) => match ct {
+            ComplexType::Record(r) => {
+                let (full_name, child_ns) = make_full_name(r.name, 
r.namespace, enclosing_ns);
+                let fields = r
+                    .fields
+                    .iter()
+                    .map(|f| {
+                        let field_type =
+                            build_canonical(&f.r#type, 
child_ns.as_deref().or(enclosing_ns))?;
+                        Ok(format!(
+                            r#"{{"name":{},"type":{}}}"#,
+                            quote(f.name)?,
+                            field_type
+                        ))
+                    })
+                    .collect::<Result<Vec<_>, ArrowError>>()?
+                    .join(",");
+                format!(
+                    r#"{{"name":{},"type":"record","fields":[{fields}]}}"#,
+                    quote(&full_name)?,
+                )
+            }
+            ComplexType::Enum(e) => {
+                let (full_name, _) = make_full_name(e.name, e.namespace, 
enclosing_ns);
+                let symbols = e
+                    .symbols
+                    .iter()
+                    .map(|s| quote(s))
+                    .collect::<Result<Vec<_>, _>>()?
+                    .join(",");
+                format!(
+                    r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#,
+                    quote(&full_name)?
+                )
+            }
+            ComplexType::Array(arr) => format!(
+                r#"{{"type":"array","items":{}}}"#,
+                build_canonical(&arr.items, enclosing_ns)?
+            ),
+            ComplexType::Map(map) => format!(
+                r#"{{"type":"map","values":{}}}"#,
+                build_canonical(&map.values, enclosing_ns)?
+            ),
+            ComplexType::Fixed(f) => {
+                let (full_name, _) = make_full_name(f.name, f.namespace, 
enclosing_ns);
+                format!(
+                    r#"{{"name":{},"type":"fixed","size":{}}}"#,
+                    quote(&full_name)?,
+                    f.size
+                )
+            }
+        },
+    })
+}
+
+/// 64‑bit Rabin fingerprint as described in the Avro spec.
+const EMPTY: u64 = 0xc15d_213a_a4d7_a795;
+
+/// Build one entry of the polynomial‑division table.
+///
+/// We cannot yet write `for _ in 0..8` here: `for` loops rely on
+/// `Iterator::next`, which is not `const` on stable Rust.  Until the
+/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
+/// loop is the only option in a `const fn`
+const fn one_entry(i: usize) -> u64 {
+    let mut fp = i as u64;
+    let mut j = 0;
+    while j < 8 {
+        fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1)));
+        j += 1;
+    }
+    fp
+}
+
+/// Build the full 256‑entry table at compile time.
+///
+/// We cannot yet write `for _ in 0..256` here: `for` loops rely on
+/// `Iterator::next`, which is not `const` on stable Rust.  Until the
+/// `const_for` feature (tracking issue #87575) is stabilized, a `while`
+/// loop is the only option in a `const fn`
+const fn build_table() -> [u64; 256] {
+    let mut table = [0u64; 256];
+    let mut i = 0;
+    while i < 256 {
+        table[i] = one_entry(i);
+        i += 1;
+    }
+    table
+}
+
+/// The pre‑computed table.
+static FINGERPRINT_TABLE: [u64; 256] = build_table();
+
+/// Computes the 64-bit Rabin fingerprint for a given canonical schema string.
+/// This implementation is based on the Avro specification for schema 
fingerprinting.
+pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 {
+    let mut fp = EMPTY;
+    for &byte in canonical_form.as_bytes() {
+        let idx = ((fp as u8) ^ byte) as usize;
+        fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx];
+    }
+    fp
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
@@ -267,6 +654,34 @@ mod tests {
     use arrow_schema::{DataType, Fields, TimeUnit};
     use serde_json::json;
 
+    fn int_schema() -> Schema<'static> {
+        Schema::TypeName(TypeName::Primitive(PrimitiveType::Int))
+    }
+
+    fn record_schema() -> Schema<'static> {
+        Schema::Complex(ComplexType::Record(Record {
+            name: "record1",
+            namespace: Some("test.namespace"),
+            doc: Some("A test record"),
+            aliases: vec![],
+            fields: vec![
+                Field {
+                    name: "field1",
+                    doc: Some("An integer field"),
+                    r#type: int_schema(),
+                    default: None,
+                },
+                Field {
+                    name: "field2",
+                    doc: None,
+                    r#type: 
Schema::TypeName(TypeName::Primitive(PrimitiveType::String)),
+                    default: None,
+                },
+            ],
+            attributes: Attributes::default(),
+        }))
+    }
+
     #[test]
     fn test_deserialize() {
         let t: Schema = serde_json::from_str("\"string\"").unwrap();
@@ -562,4 +977,147 @@ mod tests {
             }))
         );
     }
+
+    #[test]
+    fn test_new_schema_store() {
+        let store = SchemaStore::new();
+        assert!(store.schemas.is_empty());
+    }
+
+    #[test]
+    fn test_try_from_schemas_rabin() {
+        let int_avro_schema = 
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+        let record_avro_schema = 
AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
+        let schemas = vec![int_avro_schema.clone(), 
record_avro_schema.clone()];
+        let store = SchemaStore::try_from(schemas.as_slice()).unwrap();
+        let int_fp = int_avro_schema.fingerprint().unwrap();
+        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
+        let rec_fp = record_avro_schema.fingerprint().unwrap();
+        assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema));
+    }
+
+    #[test]
+    fn test_try_from_with_duplicates() {
+        let int_avro_schema = 
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+        let record_avro_schema = 
AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
+        let schemas = vec![
+            int_avro_schema.clone(),
+            record_avro_schema,
+            int_avro_schema.clone(),
+        ];
+        let store = SchemaStore::try_from(schemas.as_slice()).unwrap();
+        assert_eq!(store.schemas.len(), 2);
+        let int_fp = int_avro_schema.fingerprint().unwrap();
+        assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema));
+    }
+
+    #[test]
+    fn test_register_and_lookup_rabin() {
+        let mut store = SchemaStore::new();
+        let schema = 
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+        let fp_enum = store.register(schema.clone()).unwrap();
+        let Fingerprint::Rabin(fp_val) = fp_enum;
+        assert_eq!(
+            store.lookup(&Fingerprint::Rabin(fp_val)).cloned(),
+            Some(schema.clone())
+        );
+        assert!(store
+            .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1)))
+            .is_none());
+    }
+
+    #[test]
+    fn test_register_duplicate_schema() {
+        let mut store = SchemaStore::new();
+        let schema1 = 
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+        let schema2 = 
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+        let fingerprint1 = store.register(schema1).unwrap();
+        let fingerprint2 = store.register(schema2).unwrap();
+        assert_eq!(fingerprint1, fingerprint2);
+        assert_eq!(store.schemas.len(), 1);
+    }
+
+    #[test]
+    fn test_canonical_form_generation_primitive() {
+        let schema = int_schema();
+        let canonical_form = generate_canonical_form(&schema).unwrap();
+        assert_eq!(canonical_form, r#""int""#);
+    }
+
+    #[test]
+    fn test_canonical_form_generation_record() {
+        let schema = record_schema();
+        let expected_canonical_form = 
r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
+        let canonical_form = generate_canonical_form(&schema).unwrap();
+        assert_eq!(canonical_form, expected_canonical_form);
+    }
+
+    #[test]
+    fn test_fingerprint_calculation() {
+        let canonical_form = 
r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#;
+        let expected_fingerprint = 10505236152925314060;
+        let fingerprint = compute_fingerprint_rabin(canonical_form);
+        assert_eq!(fingerprint, expected_fingerprint);
+    }
+
+    #[test]
+    fn test_register_and_lookup_complex_schema() {
+        let mut store = SchemaStore::new();
+        let schema = 
AvroSchema::new(serde_json::to_string(&record_schema()).unwrap());
+        let canonical_form = 
r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#;
+        let expected_fingerprint =
+            
Fingerprint::Rabin(super::compute_fingerprint_rabin(canonical_form));
+        let fingerprint = store.register(schema.clone()).unwrap();
+        assert_eq!(fingerprint, expected_fingerprint);
+        let looked_up = store.lookup(&fingerprint).cloned();
+        assert_eq!(looked_up, Some(schema));
+    }
+
+    #[test]
+    fn test_fingerprints_returns_all_keys() {
+        let mut store = SchemaStore::new();
+        let fp_int = store
+            .register(AvroSchema::new(
+                serde_json::to_string(&int_schema()).unwrap(),
+            ))
+            .unwrap();
+        let fp_record = store
+            .register(AvroSchema::new(
+                serde_json::to_string(&record_schema()).unwrap(),
+            ))
+            .unwrap();
+        let fps = store.fingerprints();
+        assert_eq!(fps.len(), 2);
+        assert!(fps.contains(&fp_int));
+        assert!(fps.contains(&fp_record));
+    }
+
+    #[test]
+    fn test_canonical_form_strips_attributes() {
+        let schema_with_attrs = Schema::Complex(ComplexType::Record(Record {
+            name: "record_with_attrs",
+            namespace: None,
+            doc: Some("This doc should be stripped"),
+            aliases: vec!["alias1", "alias2"],
+            fields: vec![Field {
+                name: "f1",
+                doc: Some("field doc"),
+                r#type: Schema::Type(Type {
+                    r#type: TypeName::Primitive(PrimitiveType::Bytes),
+                    attributes: Attributes {
+                        logical_type: Some("decimal"),
+                        additional: HashMap::from([("precision", json!(4))]),
+                    },
+                }),
+                default: None,
+            }],
+            attributes: Attributes {
+                logical_type: None,
+                additional: HashMap::from([("custom_attr", json!("value"))]),
+            },
+        }));
+        let expected_canonical_form = 
r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#;
+        let canonical_form = 
generate_canonical_form(&schema_with_attrs).unwrap();
+        assert_eq!(canonical_form, expected_canonical_form);
+    }
 }

Reply via email to