This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 4a21443d2f Implement arrow-avro SchemaStore and Fingerprinting To 
Enable Schema Resolution (#8006)
4a21443d2f is described below

commit 4a21443d2f18907f0fece066c8877afce6007550
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Aug 7 15:33:02 2025 -0500

    Implement arrow-avro SchemaStore and Fingerprinting To Enable Schema 
Resolution (#8006)
    
    # Which issue does this PR close?
    
    - Part of https://github.com/apache/arrow-rs/issues/4886
    
    - Follow up to https://github.com/apache/arrow-rs/pull/7834
    
    # Rationale for this change
    
    Apache Avro’s [single object
    
encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding)
    prefixes every record with the marker `0xC3 0x01` followed by a `Rabin`
    [schema fingerprint
    ](https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints)
    so that readers can identify the correct writer schema without carrying
    the full definition in each message.
    While the current `arrow‑avro` implementation can read container files,
    it cannot ingest these framed messages or handle streams where the
    writer schema changes over time.
    
    The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin)
    hashed fingerprint of the [parsed canonical form of a
    
schema](https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas)
    to look up the `Schema` from a local schema store or registry.
    
    This PR introduces **`SchemaStore`** and **fingerprinting** to enable:
    
    * **Zero‑copy schema identification** for decoding streaming Avro
    messages published in single‑object format (i.e. Kafka, Pulsar, etc)
    into Arrow.
    * **Dynamic schema evolution** by laying the foundation to resolve
    writer reader schema differences on the fly.
    **NOTE:** Schema Resolution support in `Codec` and `RecordDecoder`
    coming the next PR.
    
    # What changes are included in this PR?
    
    | Area | Highlights |
    | ------------------- |
    
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
    |
    | **`reader/mod.rs`** | Decoder now detects the `C3 01` prefix, extracts
    the fingerprint, looks up the writer schema in a `SchemaStore`, and
    switches to an LRU cached `RecordDecoder` without interrupting
    streaming; supports `static_store_mode` to skip the 2 byte peek for
    high‑throughput fixed‑schema pipelines. |
    | **`ReaderBuilder`** | New builder configuration methods:
    `.with_writer_schema_store`, `.with_active_fingerprint`,
    `.with_static_store_mode`, `.with_reader_schema`,
    `.with_max_decoder_cache_size`, with rigorous validation to prevent
    misconfiguration. |
    | **Unit tests** | New tests covering fingerprint generation, store
    registration/lookup, schema switching, unknown‑fingerprint errors, and
    interaction with UTF8‑view decoding. |
    | **Docs & Examples** | Extensive inline docs with examples on all new
    public methods / structs. |
    
    ---
    
    # Are these changes tested?
    
    Yes.  New tests cover:
    
    1. **Fingerprinting** against the canonical examples from the Avro spec
    2. **`SchemaStore` behavior** deduplication, duplicate registration, and
    lookup.
    3. **Decoder fast‑path** with `static_store_mode=true`, ensuring the
    prefix is treated as payload, the 2 byte peek is skipped, and no schema
    switch is attempted.
    
    # Are there any user-facing changes?
    
    N/A
    
    # Follow-Up PRs
    
    1. Implement Schema Resolution Functionality in Codec and RecordDecoder
    2. Add ID `Fingerprint` variant on `SchemaStore` for Confluent Schema
    Registry compatibility
    3. Improve arrow-avro errors + add more benchmarks & examples to prepare
    for public release
    
    ---------
    
    Co-authored-by: Ryan Johnson <[email protected]>
---
 arrow-avro/Cargo.toml           |   4 +-
 arrow-avro/benches/decoder.rs   | 159 +++++++-----
 arrow-avro/src/codec.rs         |  51 +++-
 arrow-avro/src/reader/header.rs |   2 +-
 arrow-avro/src/reader/mod.rs    | 539 +++++++++++++++++++++++++++++++++-------
 5 files changed, 589 insertions(+), 166 deletions(-)

diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index d5c9dc184e..1a1fc2f066 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -55,8 +55,10 @@ zstd = { version = "0.13", default-features = false, 
optional = true }
 bzip2 = { version = "0.6.0", optional = true }
 xz = { version = "0.1", default-features = false, optional = true }
 crc = { version = "3.0", optional = true }
-uuid = "1.17"
 strum_macros = "0.27"
+uuid = "1.17"
+indexmap = "2.10"
+
 
 [dev-dependencies]
 arrow-data = { workspace = true }
diff --git a/arrow-avro/benches/decoder.rs b/arrow-avro/benches/decoder.rs
index 452f44e09e..df802daea1 100644
--- a/arrow-avro/benches/decoder.rs
+++ b/arrow-avro/benches/decoder.rs
@@ -27,58 +27,78 @@ extern crate uuid;
 
 use apache_avro::types::Value;
 use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema};
-use arrow_avro::{reader::ReaderBuilder, schema::Schema as AvroSchema};
+use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC};
+use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema};
 use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId, 
Criterion, Throughput};
 use once_cell::sync::Lazy;
-use std::{hint::black_box, io, time::Duration};
+use std::{hint::black_box, time::Duration};
 use uuid::Uuid;
 
-fn encode_records(schema: &ApacheSchema, rows: impl Iterator<Item = Value>) -> 
Vec<u8> {
+fn make_prefix(fp: Fingerprint) -> [u8; 10] {
+    let Fingerprint::Rabin(val) = fp;
+    let mut buf = [0u8; 10];
+    buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
+    buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit
+    buf
+}
+
+fn encode_records_with_prefix(
+    schema: &ApacheSchema,
+    prefix: &[u8],
+    rows: impl Iterator<Item = Value>,
+) -> Vec<u8> {
     let mut out = Vec::new();
     for v in rows {
+        out.extend_from_slice(prefix);
         out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum 
failed"));
     }
     out
 }
 
-fn gen_int(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_int(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as 
i32))])),
     )
 }
 
-fn gen_long(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_long(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long(i as 
i64))])),
     )
 }
 
-fn gen_float(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_float(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Float(i as 
f32 + 0.5678))])),
     )
 }
 
-fn gen_bool(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_bool(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Boolean(i % 
2 == 0))])),
     )
 }
 
-fn gen_double(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_double(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Double(i as 
f64 + 0.1234))])),
     )
 }
 
-fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_bytes(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let payload = vec![(i & 0xFF) as u8; 16];
             Value::Record(vec![("field1".into(), Value::Bytes(payload))])
@@ -86,9 +106,10 @@ fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_string(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let s = if i % 3 == 0 {
                 format!("value-{i}")
@@ -100,30 +121,34 @@ fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_date(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_date(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as 
i32))])),
     )
 }
 
-fn gen_timemillis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_timemillis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int((i * 
37) as i32))])),
     )
 }
 
-fn gen_timemicros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_timemicros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long((i * 
1_001) as i64))])),
     )
 }
 
-fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_ts_millis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             Value::Record(vec![(
                 "field1".into(),
@@ -133,9 +158,10 @@ fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_ts_micros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             Value::Record(vec![(
                 "field1".into(),
@@ -145,10 +171,11 @@ fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
+fn gen_map(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
     use std::collections::HashMap;
-    encode_records(
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let mut m = HashMap::new();
             let int_val = |v: i32| Value::Union(0, Box::new(Value::Int(v)));
@@ -165,9 +192,10 @@ fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_array(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_array(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let items = (0..5).map(|j| Value::Int(i as i32 + j)).collect();
             Value::Record(vec![("field1".into(), Value::Array(items))])
@@ -189,9 +217,10 @@ fn trim_i128_be(v: i128) -> Vec<u8> {
     full[first..].to_vec()
 }
 
-fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_decimal(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let unscaled = if i % 2 == 0 { i as i128 } else { -(i as i128) };
             Value::Record(vec![(
@@ -202,9 +231,10 @@ fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_uuid(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let mut raw = (i as u128).to_be_bytes();
             raw[6] = (raw[6] & 0x0F) | 0x40;
@@ -214,9 +244,10 @@ fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_fixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let mut buf = vec![0u8; 16];
             buf[..8].copy_from_slice(&(i as u64).to_be_bytes());
@@ -225,9 +256,10 @@ fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_interval(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let months = (i % 24) as u32;
             let days = (i % 32) as u32;
@@ -241,10 +273,11 @@ fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
+fn gen_enum(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
     const SYMBOLS: [&str; 3] = ["A", "B", "C"];
-    encode_records(
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let idx = i % 3;
             Value::Record(vec![(
@@ -255,9 +288,10 @@ fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_mixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             Value::Record(vec![
                 ("f1".into(), Value::Int(i as i32)),
@@ -269,9 +303,10 @@ fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
     )
 }
 
-fn gen_nested(sc: &ApacheSchema, n: usize) -> Vec<u8> {
-    encode_records(
+fn gen_nested(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+    encode_records_with_prefix(
         sc,
+        prefix,
         (0..n).map(|i| {
             let sub = Value::Record(vec![
                 ("x".into(), Value::Int(i as i32)),
@@ -290,12 +325,14 @@ fn new_decoder(
     batch_size: usize,
     utf8view: bool,
 ) -> arrow_avro::reader::Decoder {
-    let schema: AvroSchema<'static> = 
serde_json::from_str(schema_json).unwrap();
+    let schema = AvroSchema::new(schema_json.parse().unwrap());
+    let mut store = arrow_avro::schema::SchemaStore::new();
+    store.register(schema.clone()).unwrap();
     ReaderBuilder::new()
-        .with_schema(schema)
+        .with_writer_schema_store(store)
         .with_batch_size(batch_size)
         .with_utf8_view(utf8view)
-        .build_decoder(io::empty())
+        .build_decoder()
         .expect("failed to build decoder")
 }
 
@@ -325,8 +362,8 @@ const ARRAY_SCHEMA: &str = 
r#"{"type":"record","name":"ArrRec","fields":[{"name"
 const DECIMAL_SCHEMA: &str = 
r#"{"type":"record","name":"DecRec","fields":[{"name":"field1","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":3}}]}"#;
 const UUID_SCHEMA: &str = 
r#"{"type":"record","name":"UuidRec","fields":[{"name":"field1","type":{"type":"string","logicalType":"uuid"}}]}"#;
 const FIXED_SCHEMA: &str = 
r#"{"type":"record","name":"FixRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Fixed16","size":16}}]}"#;
-const INTERVAL_SCHEMA_ENCODE: &str = 
r#"{"type":"record","name":"DurRecEnc","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
 const INTERVAL_SCHEMA: &str = 
r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12,"logicalType":"duration"}}]}"#;
+const INTERVAL_SCHEMA_ENCODE: &str = 
r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
 const ENUM_SCHEMA: &str = 
r#"{"type":"record","name":"EnumRec","fields":[{"name":"field1","type":{"type":"enum","name":"MyEnum","symbols":["A","B","C"]}}]}"#;
 const MIX_SCHEMA: &str = 
r#"{"type":"record","name":"MixRec","fields":[{"name":"f1","type":"int"},{"name":"f2","type":"long"},{"name":"f3","type":"string"},{"name":"f4","type":"double"}]}"#;
 const NEST_SCHEMA: &str = 
r#"{"type":"record","name":"NestRec","fields":[{"name":"sub","type":{"type":"record","name":"Sub","fields":[{"name":"x","type":"int"},{"name":"y","type":"string"}]}}]}"#;
@@ -336,7 +373,13 @@ macro_rules! dataset {
         static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
             let schema =
                 ApacheSchema::parse_str($schema_json).expect("invalid schema 
for generator");
-            SIZES.iter().map(|&n| $gen_fn(&schema, n)).collect()
+            let arrow_schema = AvroSchema::new($schema_json.to_string());
+            let fingerprint = arrow_schema.fingerprint().expect("fingerprint 
failed");
+            let prefix = make_prefix(fingerprint);
+            SIZES
+                .iter()
+                .map(|&n| $gen_fn(&schema, n, &prefix))
+                .collect()
         });
     };
 }
@@ -406,6 +449,14 @@ fn bench_scenario(
 
 fn criterion_benches(c: &mut Criterion) {
     for &batch_size in &[SMALL_BATCH, LARGE_BATCH] {
+        bench_scenario(
+            c,
+            "Interval",
+            INTERVAL_SCHEMA,
+            &INTERVAL_DATA,
+            false,
+            batch_size,
+        );
         bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size);
         bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size);
         bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false, 
batch_size);
@@ -480,14 +531,6 @@ fn criterion_benches(c: &mut Criterion) {
             false,
             batch_size,
         );
-        bench_scenario(
-            c,
-            "Interval",
-            INTERVAL_SCHEMA,
-            &INTERVAL_DATA,
-            false,
-            batch_size,
-        );
         bench_scenario(
             c,
             "Enum(Dictionary)",
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index d4bba9a1ff..dcd3984501 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -15,10 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema, 
TypeName};
+use crate::schema::{Attributes, AvroSchema, ComplexType, PrimitiveType, 
Record, Schema, TypeName};
 use arrow_schema::{
-    ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit, 
SchemaBuilder, SchemaRef,
-    TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
+    ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, 
DECIMAL128_MAX_PRECISION,
+    DECIMAL128_MAX_SCALE,
 };
 use std::borrow::Cow;
 use std::collections::HashMap;
@@ -139,6 +139,22 @@ impl AvroField {
     pub fn name(&self) -> &str {
         &self.name
     }
+
+    /// Performs schema resolution between a writer and reader schema.
+    ///
+    /// This is the primary entry point for handling schema evolution. It 
produces an
+    /// `AvroField` that contains all the necessary information to read data 
written
+    /// with the `writer` schema as if it were written with the `reader` 
schema.
+    pub(crate) fn resolve_from_writer_and_reader<'a>(
+        writer_schema: &'a Schema<'a>,
+        reader_schema: &'a Schema<'a>,
+        use_utf8view: bool,
+        strict_mode: bool,
+    ) -> Result<Self, ArrowError> {
+        Err(ArrowError::NotYetImplemented(
+            "Resolving schema from a writer and reader schema is not yet 
implemented".to_string(),
+        ))
+    }
 }
 
 impl<'a> TryFrom<&Schema<'a>> for AvroField {
@@ -164,21 +180,33 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
 /// Builder for an [`AvroField`]
 #[derive(Debug)]
 pub struct AvroFieldBuilder<'a> {
-    schema: &'a Schema<'a>,
+    writer_schema: &'a Schema<'a>,
+    reader_schema: Option<AvroSchema>,
     use_utf8view: bool,
     strict_mode: bool,
 }
 
 impl<'a> AvroFieldBuilder<'a> {
-    /// Creates a new [`AvroFieldBuilder`]
-    pub fn new(schema: &'a Schema<'a>) -> Self {
+    /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
+    pub fn new(writer_schema: &'a Schema<'a>) -> Self {
         Self {
-            schema,
+            writer_schema,
+            reader_schema: None,
             use_utf8view: false,
             strict_mode: false,
         }
     }
 
+    /// Sets the reader schema for schema resolution.
+    ///
+    /// If a reader schema is provided, the builder will produce a resolved 
`AvroField`
+    /// that can handle differences between the writer's and reader's schemas.
+    #[inline]
+    pub fn with_reader_schema(mut self, reader_schema: AvroSchema) -> Self {
+        self.reader_schema = Some(reader_schema);
+        self
+    }
+
     /// Enable or disable Utf8View support
     pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
         self.use_utf8view = use_utf8view;
@@ -193,11 +221,11 @@ impl<'a> AvroFieldBuilder<'a> {
 
     /// Build an [`AvroField`] from the builder
     pub fn build(self) -> Result<AvroField, ArrowError> {
-        match self.schema {
+        match self.writer_schema {
             Schema::Complex(ComplexType::Record(r)) => {
                 let mut resolver = Resolver::default();
                 let data_type = make_data_type(
-                    self.schema,
+                    self.writer_schema,
                     None,
                     &mut resolver,
                     self.use_utf8view,
@@ -210,11 +238,12 @@ impl<'a> AvroFieldBuilder<'a> {
             }
             _ => Err(ArrowError::ParseError(format!(
                 "Expected a Record schema to build an AvroField, but got {:?}",
-                self.schema
+                self.writer_schema
             ))),
         }
     }
 }
+
 /// An Avro encoding
 ///
 /// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
@@ -446,7 +475,7 @@ impl<'a> Resolver<'a> {
     }
 }
 
-/// Parses a [`AvroDataType`] from the provided [`Schema`] and the given 
`name` and `namespace`
+/// Parses a [`AvroDataType`] from the provided `schema` and the given `name` 
and `namespace`
 ///
 /// `name`: is name used to refer to `schema` in its parent
 /// `namespace`: an optional qualifier used as part of a type hierarchy
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index 0f7ffd3f8d..2d26df07aa 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -92,7 +92,7 @@ impl Header {
     }
 
     /// Returns the [`Schema`] if any
-    pub fn schema(&self) -> Result<Option<Schema<'_>>, ArrowError> {
+    pub(crate) fn schema(&self) -> Result<Option<Schema<'_>>, ArrowError> {
         self.get(SCHEMA_METADATA_KEY)
             .map(|x| {
                 serde_json::from_slice(x).map_err(|e| {
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 18bc498cd2..e9bf7af61e 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -90,13 +90,18 @@
 //! ```
 //!
 
-use crate::codec::AvroFieldBuilder;
-use crate::schema::Schema as AvroSchema;
-use arrow_array::{RecordBatch, RecordBatchReader};
+use crate::codec::{AvroField, AvroFieldBuilder};
+use crate::schema::{
+    compare_schemas, generate_fingerprint, AvroSchema, Fingerprint, 
FingerprintAlgorithm, Schema,
+    SchemaStore, SINGLE_OBJECT_MAGIC,
+};
+use arrow_array::{Array, RecordBatch, RecordBatchReader};
 use arrow_schema::{ArrowError, SchemaRef};
 use block::BlockDecoder;
 use header::{Header, HeaderDecoder};
+use indexmap::IndexMap;
 use record::RecordDecoder;
+use std::collections::HashMap;
 use std::io::BufRead;
 
 mod block;
@@ -128,23 +133,22 @@ fn read_header<R: BufRead>(mut reader: R) -> 
Result<Header, ArrowError> {
 /// A low-level interface for decoding Avro-encoded bytes into Arrow 
`RecordBatch`.
 #[derive(Debug)]
 pub struct Decoder {
-    record_decoder: RecordDecoder,
+    active_decoder: RecordDecoder,
+    active_fingerprint: Option<Fingerprint>,
     batch_size: usize,
-    decoded_rows: usize,
+    remaining_capacity: usize,
+    cache: IndexMap<Fingerprint, RecordDecoder>,
+    fingerprint_algorithm: FingerprintAlgorithm,
+    expect_prefix: bool,
+    utf8_view: bool,
+    strict_mode: bool,
+    pending_schema: Option<(Fingerprint, RecordDecoder)>,
 }
 
 impl Decoder {
-    fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
-        Self {
-            record_decoder,
-            batch_size,
-            decoded_rows: 0,
-        }
-    }
-
     /// Return the Arrow schema for the rows decoded by this decoder
     pub fn schema(&self) -> SchemaRef {
-        self.record_decoder.schema().clone()
+        self.active_decoder.schema().clone()
     }
 
     /// Return the configured maximum number of rows per batch
@@ -158,39 +162,125 @@ impl Decoder {
     ///
     /// Returns the number of bytes consumed.
     pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+        if self.expect_prefix
+            && data.len() >= SINGLE_OBJECT_MAGIC.len()
+            && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+        {
+            return Err(ArrowError::ParseError(
+                "Expected single‑object encoding fingerprint prefix for first 
message \
+                 (writer_schema_store is set but active_fingerprint is None)"
+                    .into(),
+            ));
+        }
         let mut total_consumed = 0usize;
-        while total_consumed < data.len() && self.decoded_rows < 
self.batch_size {
-            let consumed = self.record_decoder.decode(&data[total_consumed..], 
1)?;
-            // A successful call to record_decoder.decode means one row was 
decoded.
-            // If `consumed` is 0 on a non-empty buffer, it implies a valid 
zero-byte record.
-            // We increment `decoded_rows` to mark progress and avoid an 
infinite loop.
-            // We add `consumed` (which can be 0) to `total_consumed`.
-            total_consumed += consumed;
-            self.decoded_rows += 1;
+        // The loop stops when the batch is full, a schema change is staged,
+        // or handle_prefix indicates we need more bytes (Some(0)).
+        while total_consumed < data.len() && self.remaining_capacity > 0 {
+            if let Some(n) = self.handle_prefix(&data[total_consumed..])? {
+                // We either consumed a prefix (n > 0) and need a schema 
switch, or we need
+                // more bytes to make a decision. Either way, this decoding 
attempt is finished.
+                total_consumed += n;
+            }
+            // No prefix: decode one row and keep going.
+            let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+            self.remaining_capacity -= 1;
+            total_consumed += n;
         }
         Ok(total_consumed)
     }
 
+    // Attempt to handle a single‑object‑encoding prefix at the current 
position.
+    //
+    // * Ok(None) – buffer does not start with the prefix.
+    // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller 
should await more bytes.
+    // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and 
fingerprint).
+    fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>, 
ArrowError> {
+        // If there is no schema store, prefixes are unrecognized.
+        if !self.expect_prefix {
+            return Ok(None);
+        }
+        // Need at least the magic bytes to decide (2 bytes).
+        let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+            return Ok(Some(0)); // Get more bytes
+        };
+        // Bail out early if the magic does not match.
+        if magic_bytes != SINGLE_OBJECT_MAGIC {
+            return Ok(None); // Continue to decode the next record
+        }
+        // Try to parse the fingerprint that follows the magic.
+        let fingerprint_size = match self.fingerprint_algorithm {
+            FingerprintAlgorithm::Rabin => self
+                .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes| 
{
+                    Fingerprint::Rabin(u64::from_le_bytes(bytes))
+                })?,
+        };
+        // Convert the inner result into a “bytes consumed” count.
+        // NOTE: Incomplete fingerprint consumes no bytes.
+        let consumed = fingerprint_size.map_or(0, |n| n + 
SINGLE_OBJECT_MAGIC.len());
+        Ok(Some(consumed))
+    }
+
+    // Attempts to read and install a new fingerprint of `N` bytes.
+    //
+    // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+    // * Ok(Some(N)) – fingerprint consumed (always `N`).
+    fn handle_fingerprint<const N: usize>(
+        &mut self,
+        buf: &[u8],
+        fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+    ) -> Result<Option<usize>, ArrowError> {
+        // Need enough bytes to get fingerprint (next N bytes)
+        let Some(fingerprint_bytes) = buf.get(..N) else {
+            return Ok(None); // Insufficient bytes
+        };
+        // SAFETY: length checked above.
+        let new_fingerprint = 
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+        // If the fingerprint indicates a schema change, prepare to switch 
decoders.
+        if self.active_fingerprint != Some(new_fingerprint) {
+            let Some(new_decoder) = self.cache.shift_remove(&new_fingerprint) 
else {
+                return Err(ArrowError::ParseError(format!(
+                    "Unknown fingerprint: {new_fingerprint:?}"
+                )));
+            };
+            self.pending_schema = Some((new_fingerprint, new_decoder));
+            // If there are already decoded rows, we must flush them first.
+            // Reducing `remaining_capacity` to 0 ensures `flush` is called 
next.
+            if self.remaining_capacity < self.batch_size {
+                self.remaining_capacity = 0;
+            }
+        }
+        Ok(Some(N))
+    }
+
     /// Produce a `RecordBatch` if at least one row is fully decoded, returning
     /// `Ok(None)` if no new rows are available.
     pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        if self.decoded_rows == 0 {
-            Ok(None)
-        } else {
-            let batch = self.record_decoder.flush()?;
-            self.decoded_rows = 0;
-            Ok(Some(batch))
+        if self.remaining_capacity == self.batch_size {
+            return Ok(None);
+        }
+        let batch = self.active_decoder.flush()?;
+        self.remaining_capacity = self.batch_size;
+        // Apply any staged schema switch.
+        if let Some((new_fingerprint, new_decoder)) = 
self.pending_schema.take() {
+            if let Some(old_fingerprint) = 
self.active_fingerprint.replace(new_fingerprint) {
+                let old_decoder = std::mem::replace(&mut self.active_decoder, 
new_decoder);
+                self.cache.shift_remove(&old_fingerprint);
+                self.cache.insert(old_fingerprint, old_decoder);
+            } else {
+                self.active_decoder = new_decoder;
+            }
         }
+        Ok(Some(batch))
     }
 
     /// Returns the number of rows that can be added to this decoder before it 
is full.
     pub fn capacity(&self) -> usize {
-        self.batch_size.saturating_sub(self.decoded_rows)
+        self.remaining_capacity
     }
 
     /// Returns true if the decoder has reached its capacity for the current 
batch.
     pub fn batch_is_full(&self) -> bool {
-        self.capacity() == 0
+        self.remaining_capacity == 0
     }
 }
 
@@ -201,7 +291,9 @@ pub struct ReaderBuilder {
     batch_size: usize,
     strict_mode: bool,
     utf8_view: bool,
-    schema: Option<AvroSchema<'static>>,
+    reader_schema: Option<AvroSchema>,
+    writer_schema_store: Option<SchemaStore>,
+    active_fingerprint: Option<Fingerprint>,
 }
 
 impl Default for ReaderBuilder {
@@ -210,7 +302,9 @@ impl Default for ReaderBuilder {
             batch_size: 1024,
             strict_mode: false,
             utf8_view: false,
-            schema: None,
+            reader_schema: None,
+            writer_schema_store: None,
+            active_fingerprint: None,
         }
     }
 }
@@ -220,34 +314,118 @@ impl ReaderBuilder {
     /// - `batch_size` = 1024
     /// - `strict_mode` = false
     /// - `utf8_view` = false
-    /// - `schema` = None
+    /// - `reader_schema` = None
+    /// - `writer_schema_store` = None
+    /// - `active_fingerprint` = None
     pub fn new() -> Self {
         Self::default()
     }
 
-    fn make_record_decoder(&self, schema: &AvroSchema<'_>) -> 
Result<RecordDecoder, ArrowError> {
-        let root_field = AvroFieldBuilder::new(schema)
+    fn make_record_decoder(
+        &self,
+        writer_schema: &Schema,
+        reader_schema: Option<&AvroSchema>,
+    ) -> Result<RecordDecoder, ArrowError> {
+        let mut builder = AvroFieldBuilder::new(writer_schema);
+        if let Some(reader_schema) = reader_schema {
+            builder = builder.with_reader_schema(reader_schema.clone());
+        }
+        let root = builder
             .with_utf8view(self.utf8_view)
             .with_strict_mode(self.strict_mode)
             .build()?;
-        RecordDecoder::try_new_with_options(root_field.data_type(), 
self.utf8_view)
+        RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
     }
 
-    fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header, 
Decoder), ArrowError> {
-        let header = read_header(reader)?;
-        let record_decoder = if let Some(schema) = &self.schema {
-            self.make_record_decoder(schema)?
-        } else {
-            let avro_schema: Option<AvroSchema<'_>> = header
+    fn make_decoder_with_parts(
+        &self,
+        active_decoder: RecordDecoder,
+        active_fingerprint: Option<Fingerprint>,
+        cache: IndexMap<Fingerprint, RecordDecoder>,
+        expect_prefix: bool,
+        fingerprint_algorithm: FingerprintAlgorithm,
+    ) -> Decoder {
+        Decoder {
+            batch_size: self.batch_size,
+            remaining_capacity: self.batch_size,
+            active_fingerprint,
+            active_decoder,
+            cache,
+            expect_prefix,
+            utf8_view: self.utf8_view,
+            fingerprint_algorithm,
+            strict_mode: self.strict_mode,
+            pending_schema: None,
+        }
+    }
+
+    fn make_decoder(
+        &self,
+        header: Option<&Header>,
+        reader_schema: Option<&AvroSchema>,
+    ) -> Result<Decoder, ArrowError> {
+        if let Some(hdr) = header {
+            let writer_schema = hdr
                 .schema()
-                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
-            let avro_schema = avro_schema.ok_or_else(|| {
-                ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+                .ok_or_else(|| {
+                    ArrowError::ParseError("No Avro schema present in file 
header".into())
+                })?;
+            let record_decoder = self.make_record_decoder(&writer_schema, 
reader_schema)?;
+            return Ok(self.make_decoder_with_parts(
+                record_decoder,
+                None,
+                IndexMap::new(),
+                false,
+                FingerprintAlgorithm::Rabin,
+            ));
+        }
+        let store = self.writer_schema_store.as_ref().ok_or_else(|| {
+            ArrowError::ParseError("Writer schema store required for raw 
Avro".into())
+        })?;
+        let fingerprints = store.fingerprints();
+        if fingerprints.is_empty() {
+            return Err(ArrowError::ParseError(
+                "Writer schema store must contain at least one schema".into(),
+            ));
+        }
+        let start_fingerprint = self
+            .active_fingerprint
+            .or_else(|| fingerprints.first().copied())
+            .ok_or_else(|| {
+                ArrowError::ParseError("Could not determine initial schema 
fingerprint".into())
             })?;
-            self.make_record_decoder(&avro_schema)?
-        };
-        let decoder = Decoder::new(record_decoder, self.batch_size);
-        Ok((header, decoder))
+        let mut cache = 
IndexMap::with_capacity(fingerprints.len().saturating_sub(1));
+        let mut active_decoder: Option<RecordDecoder> = None;
+        for fingerprint in store.fingerprints() {
+            let avro_schema = match store.lookup(&fingerprint) {
+                Some(schema) => schema,
+                None => {
+                    return Err(ArrowError::ComputeError(format!(
+                        "Fingerprint {fingerprint:?} not found in schema 
store",
+                    )));
+                }
+            };
+            let writer_schema = avro_schema.schema()?;
+            let decoder = self.make_record_decoder(&writer_schema, 
reader_schema)?;
+            if fingerprint == start_fingerprint {
+                active_decoder = Some(decoder);
+            } else {
+                cache.insert(fingerprint, decoder);
+            }
+        }
+        let active_decoder = active_decoder.ok_or_else(|| {
+            ArrowError::ComputeError(format!(
+                "Initial fingerprint {start_fingerprint:?} not found in schema 
store"
+            ))
+        })?;
+        Ok(self.make_decoder_with_parts(
+            active_decoder,
+            Some(start_fingerprint),
+            cache,
+            true,
+            store.fingerprint_algorithm(),
+        ))
     }
 
     /// Sets the row-based batch size
@@ -276,17 +454,42 @@ impl ReaderBuilder {
         self
     }
 
-    /// Sets the Avro schema.
+    /// Sets the Avro reader schema.
     ///
     /// If a schema is not provided, the schema will be read from the Avro 
file header.
-    pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
-        self.schema = Some(schema);
+    pub fn with_reader_schema(mut self, schema: AvroSchema) -> Self {
+        self.reader_schema = Some(schema);
+        self
+    }
+
+    /// Sets the `SchemaStore` used for resolving writer schemas.
+    ///
+    /// This is necessary when decoding single-object encoded data that 
identifies
+    /// schemas by a fingerprint. The store allows the decoder to look up the
+    /// full writer schema from a fingerprint embedded in the data.
+    ///
+    /// Defaults to `None`.
+    pub fn with_writer_schema_store(mut self, store: SchemaStore) -> Self {
+        self.writer_schema_store = Some(store);
+        self
+    }
+
+    /// Sets the initial schema fingerprint for decoding single-object encoded 
data.
+    ///
+    /// This is useful when the data stream does not begin with a schema 
definition
+    /// or fingerprint, allowing the decoder to start with a known schema from 
the
+    /// `SchemaStore`.
+    ///
+    /// Defaults to `None`.
+    pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
+        self.active_fingerprint = Some(fp);
         self
     }
 
     /// Create a [`Reader`] from this builder and a `BufRead`
     pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
-        let (header, decoder) = self.build_impl(&mut reader)?;
+        let header = read_header(&mut reader)?;
+        let decoder = self.make_decoder(Some(&header), 
self.reader_schema.as_ref())?;
         Ok(Reader {
             reader,
             header,
@@ -298,20 +501,14 @@ impl ReaderBuilder {
         })
     }
 
-    /// Create a [`Decoder`] from this builder and a `BufRead` by
-    /// reading and parsing the Avro file's header. This will
-    /// not create a full [`Reader`].
-    pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder, 
ArrowError> {
-        match self.schema {
-            Some(ref schema) => {
-                let record_decoder = self.make_record_decoder(schema)?;
-                Ok(Decoder::new(record_decoder, self.batch_size))
-            }
-            None => {
-                let (_, decoder) = self.build_impl(&mut reader)?;
-                Ok(decoder)
-            }
+    /// Create a [`Decoder`] from this builder.
+    pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
+        if self.writer_schema_store.is_none() {
+            return Err(ArrowError::InvalidArgumentError(
+                "Building a decoder requires a writer schema 
store".to_string(),
+            ));
         }
+        self.make_decoder(None, self.reader_schema.as_ref())
     }
 }
 
@@ -391,11 +588,15 @@ impl<R: BufRead> RecordBatchReader for Reader<R> {
 
 #[cfg(test)]
 mod test {
-    use crate::codec::{AvroDataType, AvroField, Codec};
+    use crate::codec::{AvroDataType, AvroField, AvroFieldBuilder, Codec};
     use crate::compression::CompressionCodec;
     use crate::reader::record::RecordDecoder;
     use crate::reader::vlq::VLQDecoder;
     use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
+    use crate::schema::{
+        AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType, Schema 
as AvroRaw,
+        SchemaStore, SINGLE_OBJECT_MAGIC,
+    };
     use crate::test_util::arrow_test_data;
     use arrow::array::ArrayDataBuilder;
     use arrow_array::builder::{
@@ -433,7 +634,7 @@ mod test {
         batch_size: usize,
         utf8_view: bool,
     ) -> Result<Reader<BufReader<File>>, ArrowError> {
-        let file = File::open(path).unwrap();
+        let file = File::open(path)?;
         ReaderBuilder::new()
             .with_batch_size(batch_size)
             .with_utf8_view(utf8_view)
@@ -460,6 +661,160 @@ mod test {
         }
     }
 
+    fn make_record_schema(pt: PrimitiveType) -> AvroSchema {
+        let js = format!(
+            
r#"{{"type":"record","name":"TestRecord","fields":[{{"name":"a","type":"{}"}}]}}"#,
+            pt.as_ref()
+        );
+        AvroSchema::new(js)
+    }
+
+    fn make_two_schema_store() -> (
+        SchemaStore,
+        Fingerprint,
+        Fingerprint,
+        AvroSchema,
+        AvroSchema,
+    ) {
+        let schema_int = make_record_schema(PrimitiveType::Int);
+        let schema_long = make_record_schema(PrimitiveType::Long);
+        let mut store = SchemaStore::new();
+        let fp_int = store
+            .register(schema_int.clone())
+            .expect("register int schema");
+        let fp_long = store
+            .register(schema_long.clone())
+            .expect("register long schema");
+        (store, fp_int, fp_long, schema_int, schema_long)
+    }
+
+    fn make_prefix(fp: Fingerprint) -> Vec<u8> {
+        match fp {
+            Fingerprint::Rabin(v) => {
+                let mut out = Vec::with_capacity(2 + 8);
+                out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
+                out.extend_from_slice(&v.to_le_bytes());
+                out
+            }
+        }
+    }
+
+    fn make_decoder(store: &SchemaStore, fp: Fingerprint, reader_schema: 
&AvroSchema) -> Decoder {
+        ReaderBuilder::new()
+            .with_batch_size(8)
+            .with_reader_schema(reader_schema.clone())
+            .with_writer_schema_store(store.clone())
+            .with_active_fingerprint(fp)
+            .build_decoder()
+            .expect("decoder")
+    }
+
+    #[test]
+    fn test_schema_store_register_lookup() {
+        let schema_int = make_record_schema(PrimitiveType::Int);
+        let schema_long = make_record_schema(PrimitiveType::Long);
+        let mut store = SchemaStore::new();
+        let fp_int = store.register(schema_int.clone()).unwrap();
+        let fp_long = store.register(schema_long.clone()).unwrap();
+        assert_eq!(store.lookup(&fp_int).cloned(), Some(schema_int));
+        assert_eq!(store.lookup(&fp_long).cloned(), Some(schema_long));
+        assert_eq!(store.fingerprint_algorithm(), FingerprintAlgorithm::Rabin);
+    }
+
+    #[test]
+    fn test_unknown_fingerprint_is_error() {
+        let (store, fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
+        let unknown_fp = Fingerprint::Rabin(0xDEAD_BEEF_DEAD_BEEF);
+        let prefix = make_prefix(unknown_fp);
+        let mut decoder = make_decoder(&store, fp_int, &schema_int);
+        let err = decoder.decode(&prefix).expect_err("decode should error");
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Unknown fingerprint"),
+            "unexpected message: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_missing_initial_fingerprint_error() {
+        let (store, _fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
+        let mut decoder = ReaderBuilder::new()
+            .with_batch_size(8)
+            .with_reader_schema(schema_int.clone())
+            .with_writer_schema_store(store)
+            .build_decoder()
+            .unwrap();
+        let buf = [0x02u8, 0x00u8];
+        let err = decoder.decode(&buf).expect_err("decode should error");
+        let msg = err.to_string();
+        assert!(
+            msg.contains("Expected single‑object encoding fingerprint"),
+            "unexpected message: {msg}"
+        );
+    }
+
+    #[test]
+    fn test_handle_prefix_no_schema_store() {
+        let (store, fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
+        let mut decoder = make_decoder(&store, fp_int, &schema_int);
+        decoder.expect_prefix = false;
+        let res = decoder
+            .handle_prefix(&SINGLE_OBJECT_MAGIC[..])
+            .expect("handle_prefix");
+        assert!(res.is_none(), "Expected None when expect_prefix is false");
+    }
+
+    #[test]
+    fn test_handle_prefix_incomplete_magic() {
+        let (store, fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
+        let mut decoder = make_decoder(&store, fp_int, &schema_int);
+        let buf = &SINGLE_OBJECT_MAGIC[..1];
+        let res = decoder.handle_prefix(buf).unwrap();
+        assert_eq!(res, Some(0));
+        assert!(decoder.pending_schema.is_none());
+    }
+
+    #[test]
+    fn test_handle_prefix_magic_mismatch() {
+        let (store, fp_int, _fp_long, schema_int, _schema_long) = 
make_two_schema_store();
+        let mut decoder = make_decoder(&store, fp_int, &schema_int);
+        let buf = [0xFFu8, 0x00u8, 0x01u8];
+        let res = decoder.handle_prefix(&buf).unwrap();
+        assert!(res.is_none());
+    }
+
+    #[test]
+    fn test_handle_prefix_incomplete_fingerprint() {
+        let (store, fp_int, fp_long, schema_int, _schema_long) = 
make_two_schema_store();
+        let mut decoder = make_decoder(&store, fp_int, &schema_int);
+        let long_bytes = match fp_long {
+            Fingerprint::Rabin(v) => v.to_le_bytes(),
+        };
+        let mut buf = Vec::from(SINGLE_OBJECT_MAGIC);
+        buf.extend_from_slice(&long_bytes[..4]);
+        let res = decoder.handle_prefix(&buf).unwrap();
+        assert_eq!(res, Some(0));
+        assert!(decoder.pending_schema.is_none());
+    }
+
+    #[test]
+    fn test_handle_prefix_valid_prefix_switches_schema() {
+        let (store, fp_int, fp_long, schema_int, schema_long) = 
make_two_schema_store();
+        let mut decoder = make_decoder(&store, fp_int, &schema_int);
+        let writer_schema_long = schema_long.schema().unwrap();
+        let root_long = 
AvroFieldBuilder::new(&writer_schema_long).build().unwrap();
+        let long_decoder =
+            RecordDecoder::try_new_with_options(root_long.data_type(), 
decoder.utf8_view).unwrap();
+        let _ = decoder.cache.insert(fp_long, long_decoder);
+        let mut buf = Vec::from(SINGLE_OBJECT_MAGIC);
+        let Fingerprint::Rabin(v) = fp_long;
+        buf.extend_from_slice(&v.to_le_bytes());
+        let consumed = decoder.handle_prefix(&buf).unwrap().unwrap();
+        assert_eq!(consumed, buf.len());
+        assert!(decoder.pending_schema.is_some());
+        assert_eq!(decoder.pending_schema.as_ref().unwrap().0, fp_long);
+    }
+
     #[test]
     fn test_utf8view_support() {
         let schema_json = r#"{
@@ -793,28 +1148,31 @@ mod test {
             },
         ];
         for test in tests {
-            let schema_s2: crate::schema::Schema = 
serde_json::from_str(test.schema).unwrap();
+            let avro_schema = AvroSchema::new(test.schema.to_string());
+            let mut store = SchemaStore::new();
+            let fp = store.register(avro_schema.clone()).unwrap();
+            let prefix = make_prefix(fp);
             let record_val = "some_string";
-            let mut body = vec![];
+            let mut body = prefix;
             body.push((record_val.len() as u8) << 1);
             body.extend_from_slice(record_val.as_bytes());
-            let mut reader_placeholder = Cursor::new(&[] as &[u8]);
-            let builder = ReaderBuilder::new()
+            let decoder_res = ReaderBuilder::new()
                 .with_batch_size(1)
-                .with_schema(schema_s2);
-            let decoder_result = builder.build_decoder(&mut 
reader_placeholder);
-            let decoder = match decoder_result {
-                Ok(decoder) => decoder,
+                .with_writer_schema_store(store)
+                .with_active_fingerprint(fp)
+                .build_decoder();
+            let decoder = match decoder_res {
+                Ok(d) => d,
                 Err(e) => {
                     if let Some(expected) = test.expected_error {
                         assert!(
                             e.to_string().contains(expected),
-                            "Test '{}' failed: unexpected error message at 
build.\nExpected to contain: '{expected}'\nActual: '{e}'",
-                            test.name,
+                            "Test '{}' failed at build – expected 
'{expected}', got '{e}'",
+                            test.name
                         );
                         continue;
                     } else {
-                        panic!("Test '{}' failed at decoder build: {e}", 
test.name);
+                        panic!("Test '{}' failed during build: {e}", 
test.name);
                     }
                 }
             };
@@ -831,32 +1189,23 @@ mod test {
                     let expected_array = 
Arc::new(StringArray::from(vec![record_val]));
                     let expected_batch =
                         RecordBatch::try_new(expected_schema, 
vec![expected_array]).unwrap();
-                    assert_eq!(batch, expected_batch, "Test '{}' failed", 
test.name);
-                    assert_eq!(
-                        batch.schema().field(0).name(),
-                        "f2",
-                        "Test '{}' failed",
-                        test.name
-                    );
+                    assert_eq!(batch, expected_batch, "Test '{}'", test.name);
                 }
                 (Err(e), Some(expected)) => {
                     assert!(
                         e.to_string().contains(expected),
-                        "Test '{}' failed: unexpected error message at 
decode.\nExpected to contain: '{expected}'\nActual: '{e}'",
-                        test.name,
+                        "Test '{}' – expected error containing '{expected}', 
got '{e}'",
+                        test.name
                     );
                 }
-                (Ok(batches), Some(expected)) => {
+                (Ok(_), Some(expected)) => {
                     panic!(
-                        "Test '{}' was expected to fail with '{expected}', but 
it succeeded with: {:?}",
-                        test.name, batches
+                        "Test '{}' expected failure ('{expected}') but 
succeeded",
+                        test.name
                     );
                 }
                 (Err(e), None) => {
-                    panic!(
-                        "Test '{}' was not expected to fail, but it did with 
'{e}'",
-                        test.name
-                    );
+                    panic!("Test '{}' unexpectedly failed with '{e}'", 
test.name);
                 }
             }
         }

Reply via email to