This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4a21443d2f Implement arrow-avro SchemaStore and Fingerprinting To
Enable Schema Resolution (#8006)
4a21443d2f is described below
commit 4a21443d2f18907f0fece066c8877afce6007550
Author: Connor Sanders <[email protected]>
AuthorDate: Thu Aug 7 15:33:02 2025 -0500
Implement arrow-avro SchemaStore and Fingerprinting To Enable Schema
Resolution (#8006)
# Which issue does this PR close?
- Part of https://github.com/apache/arrow-rs/issues/4886
- Follow up to https://github.com/apache/arrow-rs/pull/7834
# Rationale for this change
Apache Avro’s [single object
encoding](https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding)
prefixes every record with the marker `0xC3 0x01` followed by a `Rabin`
[schema fingerprint
](https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints)
so that readers can identify the correct writer schema without carrying
the full definition in each message.
While the current `arrow‑avro` implementation can read container files,
it cannot ingest these framed messages or handle streams where the
writer schema changes over time.
The Avro specification recommends computing a 64‑bit CRC‑64‑AVRO (Rabin)
hashed fingerprint of the [parsed canonical form of a
schema](https://avro.apache.org/docs/1.11.1/specification/#parsing-canonical-form-for-schemas)
to look up the `Schema` from a local schema store or registry.
This PR introduces **`SchemaStore`** and **fingerprinting** to enable:
* **Zero‑copy schema identification** for decoding streaming Avro
messages published in single‑object format (i.e. Kafka, Pulsar, etc)
into Arrow.
* **Dynamic schema evolution** by laying the foundation to resolve
writer reader schema differences on the fly.
**NOTE:** Schema Resolution support in `Codec` and `RecordDecoder`
coming the next PR.
# What changes are included in this PR?
| Area | Highlights |
| ------------------- |
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
| **`reader/mod.rs`** | Decoder now detects the `C3 01` prefix, extracts
the fingerprint, looks up the writer schema in a `SchemaStore`, and
switches to an LRU cached `RecordDecoder` without interrupting
streaming; supports `static_store_mode` to skip the 2 byte peek for
high‑throughput fixed‑schema pipelines. |
| **`ReaderBuilder`** | New builder configuration methods:
`.with_writer_schema_store`, `.with_active_fingerprint`,
`.with_static_store_mode`, `.with_reader_schema`,
`.with_max_decoder_cache_size`, with rigorous validation to prevent
misconfiguration. |
| **Unit tests** | New tests covering fingerprint generation, store
registration/lookup, schema switching, unknown‑fingerprint errors, and
interaction with UTF8‑view decoding. |
| **Docs & Examples** | Extensive inline docs with examples on all new
public methods / structs. |
---
# Are these changes tested?
Yes. New tests cover:
1. **Fingerprinting** against the canonical examples from the Avro spec
2. **`SchemaStore` behavior** deduplication, duplicate registration, and
lookup.
3. **Decoder fast‑path** with `static_store_mode=true`, ensuring the
prefix is treated as payload, the 2 byte peek is skipped, and no schema
switch is attempted.
# Are there any user-facing changes?
N/A
# Follow-Up PRs
1. Implement Schema Resolution Functionality in Codec and RecordDecoder
2. Add ID `Fingerprint` variant on `SchemaStore` for Confluent Schema
Registry compatibility
3. Improve arrow-avro errors + add more benchmarks & examples to prepare
for public release
---------
Co-authored-by: Ryan Johnson <[email protected]>
---
arrow-avro/Cargo.toml | 4 +-
arrow-avro/benches/decoder.rs | 159 +++++++-----
arrow-avro/src/codec.rs | 51 +++-
arrow-avro/src/reader/header.rs | 2 +-
arrow-avro/src/reader/mod.rs | 539 +++++++++++++++++++++++++++++++++-------
5 files changed, 589 insertions(+), 166 deletions(-)
diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml
index d5c9dc184e..1a1fc2f066 100644
--- a/arrow-avro/Cargo.toml
+++ b/arrow-avro/Cargo.toml
@@ -55,8 +55,10 @@ zstd = { version = "0.13", default-features = false,
optional = true }
bzip2 = { version = "0.6.0", optional = true }
xz = { version = "0.1", default-features = false, optional = true }
crc = { version = "3.0", optional = true }
-uuid = "1.17"
strum_macros = "0.27"
+uuid = "1.17"
+indexmap = "2.10"
+
[dev-dependencies]
arrow-data = { workspace = true }
diff --git a/arrow-avro/benches/decoder.rs b/arrow-avro/benches/decoder.rs
index 452f44e09e..df802daea1 100644
--- a/arrow-avro/benches/decoder.rs
+++ b/arrow-avro/benches/decoder.rs
@@ -27,58 +27,78 @@ extern crate uuid;
use apache_avro::types::Value;
use apache_avro::{to_avro_datum, Decimal, Schema as ApacheSchema};
-use arrow_avro::{reader::ReaderBuilder, schema::Schema as AvroSchema};
+use arrow_avro::schema::{Fingerprint, SINGLE_OBJECT_MAGIC};
+use arrow_avro::{reader::ReaderBuilder, schema::AvroSchema};
use criterion::{criterion_group, criterion_main, BatchSize, BenchmarkId,
Criterion, Throughput};
use once_cell::sync::Lazy;
-use std::{hint::black_box, io, time::Duration};
+use std::{hint::black_box, time::Duration};
use uuid::Uuid;
-fn encode_records(schema: &ApacheSchema, rows: impl Iterator<Item = Value>) ->
Vec<u8> {
+fn make_prefix(fp: Fingerprint) -> [u8; 10] {
+ let Fingerprint::Rabin(val) = fp;
+ let mut buf = [0u8; 10];
+ buf[..2].copy_from_slice(&SINGLE_OBJECT_MAGIC); // C3 01
+ buf[2..].copy_from_slice(&val.to_le_bytes()); // little‑endian 64‑bit
+ buf
+}
+
+fn encode_records_with_prefix(
+ schema: &ApacheSchema,
+ prefix: &[u8],
+ rows: impl Iterator<Item = Value>,
+) -> Vec<u8> {
let mut out = Vec::new();
for v in rows {
+ out.extend_from_slice(prefix);
out.extend_from_slice(&to_avro_datum(schema, v).expect("encode datum
failed"));
}
out
}
-fn gen_int(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_int(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as
i32))])),
)
}
-fn gen_long(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_long(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long(i as
i64))])),
)
}
-fn gen_float(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_float(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Float(i as
f32 + 0.5678))])),
)
}
-fn gen_bool(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_bool(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Boolean(i %
2 == 0))])),
)
}
-fn gen_double(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_double(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Double(i as
f64 + 0.1234))])),
)
}
-fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_bytes(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let payload = vec![(i & 0xFF) as u8; 16];
Value::Record(vec![("field1".into(), Value::Bytes(payload))])
@@ -86,9 +106,10 @@ fn gen_bytes(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_string(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let s = if i % 3 == 0 {
format!("value-{i}")
@@ -100,30 +121,34 @@ fn gen_string(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_date(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_date(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int(i as
i32))])),
)
}
-fn gen_timemillis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_timemillis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Int((i *
37) as i32))])),
)
}
-fn gen_timemicros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_timemicros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| Value::Record(vec![("field1".into(), Value::Long((i *
1_001) as i64))])),
)
}
-fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_ts_millis(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
Value::Record(vec![(
"field1".into(),
@@ -133,9 +158,10 @@ fn gen_ts_millis(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_ts_micros(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
Value::Record(vec![(
"field1".into(),
@@ -145,10 +171,11 @@ fn gen_ts_micros(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
+fn gen_map(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
use std::collections::HashMap;
- encode_records(
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let mut m = HashMap::new();
let int_val = |v: i32| Value::Union(0, Box::new(Value::Int(v)));
@@ -165,9 +192,10 @@ fn gen_map(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_array(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_array(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let items = (0..5).map(|j| Value::Int(i as i32 + j)).collect();
Value::Record(vec![("field1".into(), Value::Array(items))])
@@ -189,9 +217,10 @@ fn trim_i128_be(v: i128) -> Vec<u8> {
full[first..].to_vec()
}
-fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_decimal(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let unscaled = if i % 2 == 0 { i as i128 } else { -(i as i128) };
Value::Record(vec![(
@@ -202,9 +231,10 @@ fn gen_decimal(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_uuid(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let mut raw = (i as u128).to_be_bytes();
raw[6] = (raw[6] & 0x0F) | 0x40;
@@ -214,9 +244,10 @@ fn gen_uuid(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_fixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let mut buf = vec![0u8; 16];
buf[..8].copy_from_slice(&(i as u64).to_be_bytes());
@@ -225,9 +256,10 @@ fn gen_fixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_interval(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let months = (i % 24) as u32;
let days = (i % 32) as u32;
@@ -241,10 +273,11 @@ fn gen_interval(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
+fn gen_enum(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
const SYMBOLS: [&str; 3] = ["A", "B", "C"];
- encode_records(
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let idx = i % 3;
Value::Record(vec![(
@@ -255,9 +288,10 @@ fn gen_enum(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_mixed(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
Value::Record(vec![
("f1".into(), Value::Int(i as i32)),
@@ -269,9 +303,10 @@ fn gen_mixed(sc: &ApacheSchema, n: usize) -> Vec<u8> {
)
}
-fn gen_nested(sc: &ApacheSchema, n: usize) -> Vec<u8> {
- encode_records(
+fn gen_nested(sc: &ApacheSchema, n: usize, prefix: &[u8]) -> Vec<u8> {
+ encode_records_with_prefix(
sc,
+ prefix,
(0..n).map(|i| {
let sub = Value::Record(vec![
("x".into(), Value::Int(i as i32)),
@@ -290,12 +325,14 @@ fn new_decoder(
batch_size: usize,
utf8view: bool,
) -> arrow_avro::reader::Decoder {
- let schema: AvroSchema<'static> =
serde_json::from_str(schema_json).unwrap();
+ let schema = AvroSchema::new(schema_json.parse().unwrap());
+ let mut store = arrow_avro::schema::SchemaStore::new();
+ store.register(schema.clone()).unwrap();
ReaderBuilder::new()
- .with_schema(schema)
+ .with_writer_schema_store(store)
.with_batch_size(batch_size)
.with_utf8_view(utf8view)
- .build_decoder(io::empty())
+ .build_decoder()
.expect("failed to build decoder")
}
@@ -325,8 +362,8 @@ const ARRAY_SCHEMA: &str =
r#"{"type":"record","name":"ArrRec","fields":[{"name"
const DECIMAL_SCHEMA: &str =
r#"{"type":"record","name":"DecRec","fields":[{"name":"field1","type":{"type":"bytes","logicalType":"decimal","precision":10,"scale":3}}]}"#;
const UUID_SCHEMA: &str =
r#"{"type":"record","name":"UuidRec","fields":[{"name":"field1","type":{"type":"string","logicalType":"uuid"}}]}"#;
const FIXED_SCHEMA: &str =
r#"{"type":"record","name":"FixRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Fixed16","size":16}}]}"#;
-const INTERVAL_SCHEMA_ENCODE: &str =
r#"{"type":"record","name":"DurRecEnc","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
const INTERVAL_SCHEMA: &str =
r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12,"logicalType":"duration"}}]}"#;
+const INTERVAL_SCHEMA_ENCODE: &str =
r#"{"type":"record","name":"DurRec","fields":[{"name":"field1","type":{"type":"fixed","name":"Duration12","size":12}}]}"#;
const ENUM_SCHEMA: &str =
r#"{"type":"record","name":"EnumRec","fields":[{"name":"field1","type":{"type":"enum","name":"MyEnum","symbols":["A","B","C"]}}]}"#;
const MIX_SCHEMA: &str =
r#"{"type":"record","name":"MixRec","fields":[{"name":"f1","type":"int"},{"name":"f2","type":"long"},{"name":"f3","type":"string"},{"name":"f4","type":"double"}]}"#;
const NEST_SCHEMA: &str =
r#"{"type":"record","name":"NestRec","fields":[{"name":"sub","type":{"type":"record","name":"Sub","fields":[{"name":"x","type":"int"},{"name":"y","type":"string"}]}}]}"#;
@@ -336,7 +373,13 @@ macro_rules! dataset {
static $name: Lazy<Vec<Vec<u8>>> = Lazy::new(|| {
let schema =
ApacheSchema::parse_str($schema_json).expect("invalid schema
for generator");
- SIZES.iter().map(|&n| $gen_fn(&schema, n)).collect()
+ let arrow_schema = AvroSchema::new($schema_json.to_string());
+ let fingerprint = arrow_schema.fingerprint().expect("fingerprint
failed");
+ let prefix = make_prefix(fingerprint);
+ SIZES
+ .iter()
+ .map(|&n| $gen_fn(&schema, n, &prefix))
+ .collect()
});
};
}
@@ -406,6 +449,14 @@ fn bench_scenario(
fn criterion_benches(c: &mut Criterion) {
for &batch_size in &[SMALL_BATCH, LARGE_BATCH] {
+ bench_scenario(
+ c,
+ "Interval",
+ INTERVAL_SCHEMA,
+ &INTERVAL_DATA,
+ false,
+ batch_size,
+ );
bench_scenario(c, "Int32", INT_SCHEMA, &INT_DATA, false, batch_size);
bench_scenario(c, "Int64", LONG_SCHEMA, &LONG_DATA, false, batch_size);
bench_scenario(c, "Float32", FLOAT_SCHEMA, &FLOAT_DATA, false,
batch_size);
@@ -480,14 +531,6 @@ fn criterion_benches(c: &mut Criterion) {
false,
batch_size,
);
- bench_scenario(
- c,
- "Interval",
- INTERVAL_SCHEMA,
- &INTERVAL_DATA,
- false,
- batch_size,
- );
bench_scenario(
c,
"Enum(Dictionary)",
diff --git a/arrow-avro/src/codec.rs b/arrow-avro/src/codec.rs
index d4bba9a1ff..dcd3984501 100644
--- a/arrow-avro/src/codec.rs
+++ b/arrow-avro/src/codec.rs
@@ -15,10 +15,10 @@
// specific language governing permissions and limitations
// under the License.
-use crate::schema::{Attributes, ComplexType, PrimitiveType, Record, Schema,
TypeName};
+use crate::schema::{Attributes, AvroSchema, ComplexType, PrimitiveType,
Record, Schema, TypeName};
use arrow_schema::{
- ArrowError, DataType, Field, FieldRef, Fields, IntervalUnit,
SchemaBuilder, SchemaRef,
- TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE,
+ ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit,
DECIMAL128_MAX_PRECISION,
+ DECIMAL128_MAX_SCALE,
};
use std::borrow::Cow;
use std::collections::HashMap;
@@ -139,6 +139,22 @@ impl AvroField {
pub fn name(&self) -> &str {
&self.name
}
+
+ /// Performs schema resolution between a writer and reader schema.
+ ///
+ /// This is the primary entry point for handling schema evolution. It
produces an
+ /// `AvroField` that contains all the necessary information to read data
written
+ /// with the `writer` schema as if it were written with the `reader`
schema.
+ pub(crate) fn resolve_from_writer_and_reader<'a>(
+ writer_schema: &'a Schema<'a>,
+ reader_schema: &'a Schema<'a>,
+ use_utf8view: bool,
+ strict_mode: bool,
+ ) -> Result<Self, ArrowError> {
+ Err(ArrowError::NotYetImplemented(
+ "Resolving schema from a writer and reader schema is not yet
implemented".to_string(),
+ ))
+ }
}
impl<'a> TryFrom<&Schema<'a>> for AvroField {
@@ -164,21 +180,33 @@ impl<'a> TryFrom<&Schema<'a>> for AvroField {
/// Builder for an [`AvroField`]
#[derive(Debug)]
pub struct AvroFieldBuilder<'a> {
- schema: &'a Schema<'a>,
+ writer_schema: &'a Schema<'a>,
+ reader_schema: Option<AvroSchema>,
use_utf8view: bool,
strict_mode: bool,
}
impl<'a> AvroFieldBuilder<'a> {
- /// Creates a new [`AvroFieldBuilder`]
- pub fn new(schema: &'a Schema<'a>) -> Self {
+ /// Creates a new [`AvroFieldBuilder`] for a given writer schema.
+ pub fn new(writer_schema: &'a Schema<'a>) -> Self {
Self {
- schema,
+ writer_schema,
+ reader_schema: None,
use_utf8view: false,
strict_mode: false,
}
}
+ /// Sets the reader schema for schema resolution.
+ ///
+ /// If a reader schema is provided, the builder will produce a resolved
`AvroField`
+ /// that can handle differences between the writer's and reader's schemas.
+ #[inline]
+ pub fn with_reader_schema(mut self, reader_schema: AvroSchema) -> Self {
+ self.reader_schema = Some(reader_schema);
+ self
+ }
+
/// Enable or disable Utf8View support
pub fn with_utf8view(mut self, use_utf8view: bool) -> Self {
self.use_utf8view = use_utf8view;
@@ -193,11 +221,11 @@ impl<'a> AvroFieldBuilder<'a> {
/// Build an [`AvroField`] from the builder
pub fn build(self) -> Result<AvroField, ArrowError> {
- match self.schema {
+ match self.writer_schema {
Schema::Complex(ComplexType::Record(r)) => {
let mut resolver = Resolver::default();
let data_type = make_data_type(
- self.schema,
+ self.writer_schema,
None,
&mut resolver,
self.use_utf8view,
@@ -210,11 +238,12 @@ impl<'a> AvroFieldBuilder<'a> {
}
_ => Err(ArrowError::ParseError(format!(
"Expected a Record schema to build an AvroField, but got {:?}",
- self.schema
+ self.writer_schema
))),
}
}
}
+
/// An Avro encoding
///
/// <https://avro.apache.org/docs/1.11.1/specification/#encodings>
@@ -446,7 +475,7 @@ impl<'a> Resolver<'a> {
}
}
-/// Parses a [`AvroDataType`] from the provided [`Schema`] and the given
`name` and `namespace`
+/// Parses a [`AvroDataType`] from the provided `schema` and the given `name`
and `namespace`
///
/// `name`: is name used to refer to `schema` in its parent
/// `namespace`: an optional qualifier used as part of a type hierarchy
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index 0f7ffd3f8d..2d26df07aa 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -92,7 +92,7 @@ impl Header {
}
/// Returns the [`Schema`] if any
- pub fn schema(&self) -> Result<Option<Schema<'_>>, ArrowError> {
+ pub(crate) fn schema(&self) -> Result<Option<Schema<'_>>, ArrowError> {
self.get(SCHEMA_METADATA_KEY)
.map(|x| {
serde_json::from_slice(x).map_err(|e| {
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 18bc498cd2..e9bf7af61e 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -90,13 +90,18 @@
//! ```
//!
-use crate::codec::AvroFieldBuilder;
-use crate::schema::Schema as AvroSchema;
-use arrow_array::{RecordBatch, RecordBatchReader};
+use crate::codec::{AvroField, AvroFieldBuilder};
+use crate::schema::{
+ compare_schemas, generate_fingerprint, AvroSchema, Fingerprint,
FingerprintAlgorithm, Schema,
+ SchemaStore, SINGLE_OBJECT_MAGIC,
+};
+use arrow_array::{Array, RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, SchemaRef};
use block::BlockDecoder;
use header::{Header, HeaderDecoder};
+use indexmap::IndexMap;
use record::RecordDecoder;
+use std::collections::HashMap;
use std::io::BufRead;
mod block;
@@ -128,23 +133,22 @@ fn read_header<R: BufRead>(mut reader: R) ->
Result<Header, ArrowError> {
/// A low-level interface for decoding Avro-encoded bytes into Arrow
`RecordBatch`.
#[derive(Debug)]
pub struct Decoder {
- record_decoder: RecordDecoder,
+ active_decoder: RecordDecoder,
+ active_fingerprint: Option<Fingerprint>,
batch_size: usize,
- decoded_rows: usize,
+ remaining_capacity: usize,
+ cache: IndexMap<Fingerprint, RecordDecoder>,
+ fingerprint_algorithm: FingerprintAlgorithm,
+ expect_prefix: bool,
+ utf8_view: bool,
+ strict_mode: bool,
+ pending_schema: Option<(Fingerprint, RecordDecoder)>,
}
impl Decoder {
- fn new(record_decoder: RecordDecoder, batch_size: usize) -> Self {
- Self {
- record_decoder,
- batch_size,
- decoded_rows: 0,
- }
- }
-
/// Return the Arrow schema for the rows decoded by this decoder
pub fn schema(&self) -> SchemaRef {
- self.record_decoder.schema().clone()
+ self.active_decoder.schema().clone()
}
/// Return the configured maximum number of rows per batch
@@ -158,39 +162,125 @@ impl Decoder {
///
/// Returns the number of bytes consumed.
pub fn decode(&mut self, data: &[u8]) -> Result<usize, ArrowError> {
+ if self.expect_prefix
+ && data.len() >= SINGLE_OBJECT_MAGIC.len()
+ && !data.starts_with(&SINGLE_OBJECT_MAGIC)
+ {
+ return Err(ArrowError::ParseError(
+ "Expected single‑object encoding fingerprint prefix for first
message \
+ (writer_schema_store is set but active_fingerprint is None)"
+ .into(),
+ ));
+ }
let mut total_consumed = 0usize;
- while total_consumed < data.len() && self.decoded_rows <
self.batch_size {
- let consumed = self.record_decoder.decode(&data[total_consumed..],
1)?;
- // A successful call to record_decoder.decode means one row was
decoded.
- // If `consumed` is 0 on a non-empty buffer, it implies a valid
zero-byte record.
- // We increment `decoded_rows` to mark progress and avoid an
infinite loop.
- // We add `consumed` (which can be 0) to `total_consumed`.
- total_consumed += consumed;
- self.decoded_rows += 1;
+ // The loop stops when the batch is full, a schema change is staged,
+ // or handle_prefix indicates we need more bytes (Some(0)).
+ while total_consumed < data.len() && self.remaining_capacity > 0 {
+ if let Some(n) = self.handle_prefix(&data[total_consumed..])? {
+ // We either consumed a prefix (n > 0) and need a schema
switch, or we need
+ // more bytes to make a decision. Either way, this decoding
attempt is finished.
+ total_consumed += n;
+ }
+ // No prefix: decode one row and keep going.
+ let n = self.active_decoder.decode(&data[total_consumed..], 1)?;
+ self.remaining_capacity -= 1;
+ total_consumed += n;
}
Ok(total_consumed)
}
+ // Attempt to handle a single‑object‑encoding prefix at the current
position.
+ //
+ // * Ok(None) – buffer does not start with the prefix.
+ // * Ok(Some(0)) – prefix detected, but the buffer is too short; caller
should await more bytes.
+ // * Ok(Some(n)) – consumed `n > 0` bytes of a complete prefix (magic and
fingerprint).
+ fn handle_prefix(&mut self, buf: &[u8]) -> Result<Option<usize>,
ArrowError> {
+ // If there is no schema store, prefixes are unrecognized.
+ if !self.expect_prefix {
+ return Ok(None);
+ }
+ // Need at least the magic bytes to decide (2 bytes).
+ let Some(magic_bytes) = buf.get(..SINGLE_OBJECT_MAGIC.len()) else {
+ return Ok(Some(0)); // Get more bytes
+ };
+ // Bail out early if the magic does not match.
+ if magic_bytes != SINGLE_OBJECT_MAGIC {
+ return Ok(None); // Continue to decode the next record
+ }
+ // Try to parse the fingerprint that follows the magic.
+ let fingerprint_size = match self.fingerprint_algorithm {
+ FingerprintAlgorithm::Rabin => self
+ .handle_fingerprint(&buf[SINGLE_OBJECT_MAGIC.len()..], |bytes|
{
+ Fingerprint::Rabin(u64::from_le_bytes(bytes))
+ })?,
+ };
+ // Convert the inner result into a “bytes consumed” count.
+ // NOTE: Incomplete fingerprint consumes no bytes.
+ let consumed = fingerprint_size.map_or(0, |n| n +
SINGLE_OBJECT_MAGIC.len());
+ Ok(Some(consumed))
+ }
+
+ // Attempts to read and install a new fingerprint of `N` bytes.
+ //
+ // * Ok(None) – insufficient bytes (`buf.len() < `N`).
+ // * Ok(Some(N)) – fingerprint consumed (always `N`).
+ fn handle_fingerprint<const N: usize>(
+ &mut self,
+ buf: &[u8],
+ fingerprint_from: impl FnOnce([u8; N]) -> Fingerprint,
+ ) -> Result<Option<usize>, ArrowError> {
+ // Need enough bytes to get fingerprint (next N bytes)
+ let Some(fingerprint_bytes) = buf.get(..N) else {
+ return Ok(None); // Insufficient bytes
+ };
+ // SAFETY: length checked above.
+ let new_fingerprint =
fingerprint_from(fingerprint_bytes.try_into().unwrap());
+ // If the fingerprint indicates a schema change, prepare to switch
decoders.
+ if self.active_fingerprint != Some(new_fingerprint) {
+ let Some(new_decoder) = self.cache.shift_remove(&new_fingerprint)
else {
+ return Err(ArrowError::ParseError(format!(
+ "Unknown fingerprint: {new_fingerprint:?}"
+ )));
+ };
+ self.pending_schema = Some((new_fingerprint, new_decoder));
+ // If there are already decoded rows, we must flush them first.
+ // Reducing `remaining_capacity` to 0 ensures `flush` is called
next.
+ if self.remaining_capacity < self.batch_size {
+ self.remaining_capacity = 0;
+ }
+ }
+ Ok(Some(N))
+ }
+
/// Produce a `RecordBatch` if at least one row is fully decoded, returning
/// `Ok(None)` if no new rows are available.
pub fn flush(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
- if self.decoded_rows == 0 {
- Ok(None)
- } else {
- let batch = self.record_decoder.flush()?;
- self.decoded_rows = 0;
- Ok(Some(batch))
+ if self.remaining_capacity == self.batch_size {
+ return Ok(None);
+ }
+ let batch = self.active_decoder.flush()?;
+ self.remaining_capacity = self.batch_size;
+ // Apply any staged schema switch.
+ if let Some((new_fingerprint, new_decoder)) =
self.pending_schema.take() {
+ if let Some(old_fingerprint) =
self.active_fingerprint.replace(new_fingerprint) {
+ let old_decoder = std::mem::replace(&mut self.active_decoder,
new_decoder);
+ self.cache.shift_remove(&old_fingerprint);
+ self.cache.insert(old_fingerprint, old_decoder);
+ } else {
+ self.active_decoder = new_decoder;
+ }
}
+ Ok(Some(batch))
}
/// Returns the number of rows that can be added to this decoder before it
is full.
pub fn capacity(&self) -> usize {
- self.batch_size.saturating_sub(self.decoded_rows)
+ self.remaining_capacity
}
/// Returns true if the decoder has reached its capacity for the current
batch.
pub fn batch_is_full(&self) -> bool {
- self.capacity() == 0
+ self.remaining_capacity == 0
}
}
@@ -201,7 +291,9 @@ pub struct ReaderBuilder {
batch_size: usize,
strict_mode: bool,
utf8_view: bool,
- schema: Option<AvroSchema<'static>>,
+ reader_schema: Option<AvroSchema>,
+ writer_schema_store: Option<SchemaStore>,
+ active_fingerprint: Option<Fingerprint>,
}
impl Default for ReaderBuilder {
@@ -210,7 +302,9 @@ impl Default for ReaderBuilder {
batch_size: 1024,
strict_mode: false,
utf8_view: false,
- schema: None,
+ reader_schema: None,
+ writer_schema_store: None,
+ active_fingerprint: None,
}
}
}
@@ -220,34 +314,118 @@ impl ReaderBuilder {
/// - `batch_size` = 1024
/// - `strict_mode` = false
/// - `utf8_view` = false
- /// - `schema` = None
+ /// - `reader_schema` = None
+ /// - `writer_schema_store` = None
+ /// - `active_fingerprint` = None
pub fn new() -> Self {
Self::default()
}
- fn make_record_decoder(&self, schema: &AvroSchema<'_>) ->
Result<RecordDecoder, ArrowError> {
- let root_field = AvroFieldBuilder::new(schema)
+ fn make_record_decoder(
+ &self,
+ writer_schema: &Schema,
+ reader_schema: Option<&AvroSchema>,
+ ) -> Result<RecordDecoder, ArrowError> {
+ let mut builder = AvroFieldBuilder::new(writer_schema);
+ if let Some(reader_schema) = reader_schema {
+ builder = builder.with_reader_schema(reader_schema.clone());
+ }
+ let root = builder
.with_utf8view(self.utf8_view)
.with_strict_mode(self.strict_mode)
.build()?;
- RecordDecoder::try_new_with_options(root_field.data_type(),
self.utf8_view)
+ RecordDecoder::try_new_with_options(root.data_type(), self.utf8_view)
}
- fn build_impl<R: BufRead>(self, reader: &mut R) -> Result<(Header,
Decoder), ArrowError> {
- let header = read_header(reader)?;
- let record_decoder = if let Some(schema) = &self.schema {
- self.make_record_decoder(schema)?
- } else {
- let avro_schema: Option<AvroSchema<'_>> = header
+ fn make_decoder_with_parts(
+ &self,
+ active_decoder: RecordDecoder,
+ active_fingerprint: Option<Fingerprint>,
+ cache: IndexMap<Fingerprint, RecordDecoder>,
+ expect_prefix: bool,
+ fingerprint_algorithm: FingerprintAlgorithm,
+ ) -> Decoder {
+ Decoder {
+ batch_size: self.batch_size,
+ remaining_capacity: self.batch_size,
+ active_fingerprint,
+ active_decoder,
+ cache,
+ expect_prefix,
+ utf8_view: self.utf8_view,
+ fingerprint_algorithm,
+ strict_mode: self.strict_mode,
+ pending_schema: None,
+ }
+ }
+
+ fn make_decoder(
+ &self,
+ header: Option<&Header>,
+ reader_schema: Option<&AvroSchema>,
+ ) -> Result<Decoder, ArrowError> {
+ if let Some(hdr) = header {
+ let writer_schema = hdr
.schema()
- .map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
- let avro_schema = avro_schema.ok_or_else(|| {
- ArrowError::ParseError("No Avro schema present in file
header".to_string())
+ .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+ .ok_or_else(|| {
+ ArrowError::ParseError("No Avro schema present in file
header".into())
+ })?;
+ let record_decoder = self.make_record_decoder(&writer_schema,
reader_schema)?;
+ return Ok(self.make_decoder_with_parts(
+ record_decoder,
+ None,
+ IndexMap::new(),
+ false,
+ FingerprintAlgorithm::Rabin,
+ ));
+ }
+ let store = self.writer_schema_store.as_ref().ok_or_else(|| {
+ ArrowError::ParseError("Writer schema store required for raw
Avro".into())
+ })?;
+ let fingerprints = store.fingerprints();
+ if fingerprints.is_empty() {
+ return Err(ArrowError::ParseError(
+ "Writer schema store must contain at least one schema".into(),
+ ));
+ }
+ let start_fingerprint = self
+ .active_fingerprint
+ .or_else(|| fingerprints.first().copied())
+ .ok_or_else(|| {
+ ArrowError::ParseError("Could not determine initial schema
fingerprint".into())
})?;
- self.make_record_decoder(&avro_schema)?
- };
- let decoder = Decoder::new(record_decoder, self.batch_size);
- Ok((header, decoder))
+ let mut cache =
IndexMap::with_capacity(fingerprints.len().saturating_sub(1));
+ let mut active_decoder: Option<RecordDecoder> = None;
+ for fingerprint in store.fingerprints() {
+ let avro_schema = match store.lookup(&fingerprint) {
+ Some(schema) => schema,
+ None => {
+ return Err(ArrowError::ComputeError(format!(
+ "Fingerprint {fingerprint:?} not found in schema
store",
+ )));
+ }
+ };
+ let writer_schema = avro_schema.schema()?;
+ let decoder = self.make_record_decoder(&writer_schema,
reader_schema)?;
+ if fingerprint == start_fingerprint {
+ active_decoder = Some(decoder);
+ } else {
+ cache.insert(fingerprint, decoder);
+ }
+ }
+ let active_decoder = active_decoder.ok_or_else(|| {
+ ArrowError::ComputeError(format!(
+ "Initial fingerprint {start_fingerprint:?} not found in schema
store"
+ ))
+ })?;
+ Ok(self.make_decoder_with_parts(
+ active_decoder,
+ Some(start_fingerprint),
+ cache,
+ true,
+ store.fingerprint_algorithm(),
+ ))
}
/// Sets the row-based batch size
@@ -276,17 +454,42 @@ impl ReaderBuilder {
self
}
- /// Sets the Avro schema.
+ /// Sets the Avro reader schema.
///
/// If a schema is not provided, the schema will be read from the Avro
file header.
- pub fn with_schema(mut self, schema: AvroSchema<'static>) -> Self {
- self.schema = Some(schema);
+ pub fn with_reader_schema(mut self, schema: AvroSchema) -> Self {
+ self.reader_schema = Some(schema);
+ self
+ }
+
+ /// Sets the `SchemaStore` used for resolving writer schemas.
+ ///
+ /// This is necessary when decoding single-object encoded data that
identifies
+ /// schemas by a fingerprint. The store allows the decoder to look up the
+ /// full writer schema from a fingerprint embedded in the data.
+ ///
+ /// Defaults to `None`.
+ pub fn with_writer_schema_store(mut self, store: SchemaStore) -> Self {
+ self.writer_schema_store = Some(store);
+ self
+ }
+
+ /// Sets the initial schema fingerprint for decoding single-object encoded
data.
+ ///
+ /// This is useful when the data stream does not begin with a schema
definition
+ /// or fingerprint, allowing the decoder to start with a known schema from
the
+ /// `SchemaStore`.
+ ///
+ /// Defaults to `None`.
+ pub fn with_active_fingerprint(mut self, fp: Fingerprint) -> Self {
+ self.active_fingerprint = Some(fp);
self
}
/// Create a [`Reader`] from this builder and a `BufRead`
pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>,
ArrowError> {
- let (header, decoder) = self.build_impl(&mut reader)?;
+ let header = read_header(&mut reader)?;
+ let decoder = self.make_decoder(Some(&header),
self.reader_schema.as_ref())?;
Ok(Reader {
reader,
header,
@@ -298,20 +501,14 @@ impl ReaderBuilder {
})
}
- /// Create a [`Decoder`] from this builder and a `BufRead` by
- /// reading and parsing the Avro file's header. This will
- /// not create a full [`Reader`].
- pub fn build_decoder<R: BufRead>(self, mut reader: R) -> Result<Decoder,
ArrowError> {
- match self.schema {
- Some(ref schema) => {
- let record_decoder = self.make_record_decoder(schema)?;
- Ok(Decoder::new(record_decoder, self.batch_size))
- }
- None => {
- let (_, decoder) = self.build_impl(&mut reader)?;
- Ok(decoder)
- }
+ /// Create a [`Decoder`] from this builder.
+ pub fn build_decoder(self) -> Result<Decoder, ArrowError> {
+ if self.writer_schema_store.is_none() {
+ return Err(ArrowError::InvalidArgumentError(
+ "Building a decoder requires a writer schema
store".to_string(),
+ ));
}
+ self.make_decoder(None, self.reader_schema.as_ref())
}
}
@@ -391,11 +588,15 @@ impl<R: BufRead> RecordBatchReader for Reader<R> {
#[cfg(test)]
mod test {
- use crate::codec::{AvroDataType, AvroField, Codec};
+ use crate::codec::{AvroDataType, AvroField, AvroFieldBuilder, Codec};
use crate::compression::CompressionCodec;
use crate::reader::record::RecordDecoder;
use crate::reader::vlq::VLQDecoder;
use crate::reader::{read_header, Decoder, Reader, ReaderBuilder};
+ use crate::schema::{
+ AvroSchema, Fingerprint, FingerprintAlgorithm, PrimitiveType, Schema
as AvroRaw,
+ SchemaStore, SINGLE_OBJECT_MAGIC,
+ };
use crate::test_util::arrow_test_data;
use arrow::array::ArrayDataBuilder;
use arrow_array::builder::{
@@ -433,7 +634,7 @@ mod test {
batch_size: usize,
utf8_view: bool,
) -> Result<Reader<BufReader<File>>, ArrowError> {
- let file = File::open(path).unwrap();
+ let file = File::open(path)?;
ReaderBuilder::new()
.with_batch_size(batch_size)
.with_utf8_view(utf8_view)
@@ -460,6 +661,160 @@ mod test {
}
}
+ fn make_record_schema(pt: PrimitiveType) -> AvroSchema {
+ let js = format!(
+
r#"{{"type":"record","name":"TestRecord","fields":[{{"name":"a","type":"{}"}}]}}"#,
+ pt.as_ref()
+ );
+ AvroSchema::new(js)
+ }
+
+ fn make_two_schema_store() -> (
+ SchemaStore,
+ Fingerprint,
+ Fingerprint,
+ AvroSchema,
+ AvroSchema,
+ ) {
+ let schema_int = make_record_schema(PrimitiveType::Int);
+ let schema_long = make_record_schema(PrimitiveType::Long);
+ let mut store = SchemaStore::new();
+ let fp_int = store
+ .register(schema_int.clone())
+ .expect("register int schema");
+ let fp_long = store
+ .register(schema_long.clone())
+ .expect("register long schema");
+ (store, fp_int, fp_long, schema_int, schema_long)
+ }
+
+ fn make_prefix(fp: Fingerprint) -> Vec<u8> {
+ match fp {
+ Fingerprint::Rabin(v) => {
+ let mut out = Vec::with_capacity(2 + 8);
+ out.extend_from_slice(&SINGLE_OBJECT_MAGIC);
+ out.extend_from_slice(&v.to_le_bytes());
+ out
+ }
+ }
+ }
+
+ fn make_decoder(store: &SchemaStore, fp: Fingerprint, reader_schema:
&AvroSchema) -> Decoder {
+ ReaderBuilder::new()
+ .with_batch_size(8)
+ .with_reader_schema(reader_schema.clone())
+ .with_writer_schema_store(store.clone())
+ .with_active_fingerprint(fp)
+ .build_decoder()
+ .expect("decoder")
+ }
+
+ #[test]
+ fn test_schema_store_register_lookup() {
+ let schema_int = make_record_schema(PrimitiveType::Int);
+ let schema_long = make_record_schema(PrimitiveType::Long);
+ let mut store = SchemaStore::new();
+ let fp_int = store.register(schema_int.clone()).unwrap();
+ let fp_long = store.register(schema_long.clone()).unwrap();
+ assert_eq!(store.lookup(&fp_int).cloned(), Some(schema_int));
+ assert_eq!(store.lookup(&fp_long).cloned(), Some(schema_long));
+ assert_eq!(store.fingerprint_algorithm(), FingerprintAlgorithm::Rabin);
+ }
+
+ #[test]
+ fn test_unknown_fingerprint_is_error() {
+ let (store, fp_int, _fp_long, schema_int, _schema_long) =
make_two_schema_store();
+ let unknown_fp = Fingerprint::Rabin(0xDEAD_BEEF_DEAD_BEEF);
+ let prefix = make_prefix(unknown_fp);
+ let mut decoder = make_decoder(&store, fp_int, &schema_int);
+ let err = decoder.decode(&prefix).expect_err("decode should error");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Unknown fingerprint"),
+ "unexpected message: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_missing_initial_fingerprint_error() {
+ let (store, _fp_int, _fp_long, schema_int, _schema_long) =
make_two_schema_store();
+ let mut decoder = ReaderBuilder::new()
+ .with_batch_size(8)
+ .with_reader_schema(schema_int.clone())
+ .with_writer_schema_store(store)
+ .build_decoder()
+ .unwrap();
+ let buf = [0x02u8, 0x00u8];
+ let err = decoder.decode(&buf).expect_err("decode should error");
+ let msg = err.to_string();
+ assert!(
+ msg.contains("Expected single‑object encoding fingerprint"),
+ "unexpected message: {msg}"
+ );
+ }
+
+ #[test]
+ fn test_handle_prefix_no_schema_store() {
+ let (store, fp_int, _fp_long, schema_int, _schema_long) =
make_two_schema_store();
+ let mut decoder = make_decoder(&store, fp_int, &schema_int);
+ decoder.expect_prefix = false;
+ let res = decoder
+ .handle_prefix(&SINGLE_OBJECT_MAGIC[..])
+ .expect("handle_prefix");
+ assert!(res.is_none(), "Expected None when expect_prefix is false");
+ }
+
+ #[test]
+ fn test_handle_prefix_incomplete_magic() {
+ let (store, fp_int, _fp_long, schema_int, _schema_long) =
make_two_schema_store();
+ let mut decoder = make_decoder(&store, fp_int, &schema_int);
+ let buf = &SINGLE_OBJECT_MAGIC[..1];
+ let res = decoder.handle_prefix(buf).unwrap();
+ assert_eq!(res, Some(0));
+ assert!(decoder.pending_schema.is_none());
+ }
+
+ #[test]
+ fn test_handle_prefix_magic_mismatch() {
+ let (store, fp_int, _fp_long, schema_int, _schema_long) =
make_two_schema_store();
+ let mut decoder = make_decoder(&store, fp_int, &schema_int);
+ let buf = [0xFFu8, 0x00u8, 0x01u8];
+ let res = decoder.handle_prefix(&buf).unwrap();
+ assert!(res.is_none());
+ }
+
+ #[test]
+ fn test_handle_prefix_incomplete_fingerprint() {
+ let (store, fp_int, fp_long, schema_int, _schema_long) =
make_two_schema_store();
+ let mut decoder = make_decoder(&store, fp_int, &schema_int);
+ let long_bytes = match fp_long {
+ Fingerprint::Rabin(v) => v.to_le_bytes(),
+ };
+ let mut buf = Vec::from(SINGLE_OBJECT_MAGIC);
+ buf.extend_from_slice(&long_bytes[..4]);
+ let res = decoder.handle_prefix(&buf).unwrap();
+ assert_eq!(res, Some(0));
+ assert!(decoder.pending_schema.is_none());
+ }
+
+ #[test]
+ fn test_handle_prefix_valid_prefix_switches_schema() {
+ let (store, fp_int, fp_long, schema_int, schema_long) =
make_two_schema_store();
+ let mut decoder = make_decoder(&store, fp_int, &schema_int);
+ let writer_schema_long = schema_long.schema().unwrap();
+ let root_long =
AvroFieldBuilder::new(&writer_schema_long).build().unwrap();
+ let long_decoder =
+ RecordDecoder::try_new_with_options(root_long.data_type(),
decoder.utf8_view).unwrap();
+ let _ = decoder.cache.insert(fp_long, long_decoder);
+ let mut buf = Vec::from(SINGLE_OBJECT_MAGIC);
+ let Fingerprint::Rabin(v) = fp_long;
+ buf.extend_from_slice(&v.to_le_bytes());
+ let consumed = decoder.handle_prefix(&buf).unwrap().unwrap();
+ assert_eq!(consumed, buf.len());
+ assert!(decoder.pending_schema.is_some());
+ assert_eq!(decoder.pending_schema.as_ref().unwrap().0, fp_long);
+ }
+
#[test]
fn test_utf8view_support() {
let schema_json = r#"{
@@ -793,28 +1148,31 @@ mod test {
},
];
for test in tests {
- let schema_s2: crate::schema::Schema =
serde_json::from_str(test.schema).unwrap();
+ let avro_schema = AvroSchema::new(test.schema.to_string());
+ let mut store = SchemaStore::new();
+ let fp = store.register(avro_schema.clone()).unwrap();
+ let prefix = make_prefix(fp);
let record_val = "some_string";
- let mut body = vec![];
+ let mut body = prefix;
body.push((record_val.len() as u8) << 1);
body.extend_from_slice(record_val.as_bytes());
- let mut reader_placeholder = Cursor::new(&[] as &[u8]);
- let builder = ReaderBuilder::new()
+ let decoder_res = ReaderBuilder::new()
.with_batch_size(1)
- .with_schema(schema_s2);
- let decoder_result = builder.build_decoder(&mut
reader_placeholder);
- let decoder = match decoder_result {
- Ok(decoder) => decoder,
+ .with_writer_schema_store(store)
+ .with_active_fingerprint(fp)
+ .build_decoder();
+ let decoder = match decoder_res {
+ Ok(d) => d,
Err(e) => {
if let Some(expected) = test.expected_error {
assert!(
e.to_string().contains(expected),
- "Test '{}' failed: unexpected error message at
build.\nExpected to contain: '{expected}'\nActual: '{e}'",
- test.name,
+ "Test '{}' failed at build – expected
'{expected}', got '{e}'",
+ test.name
);
continue;
} else {
- panic!("Test '{}' failed at decoder build: {e}",
test.name);
+ panic!("Test '{}' failed during build: {e}",
test.name);
}
}
};
@@ -831,32 +1189,23 @@ mod test {
let expected_array =
Arc::new(StringArray::from(vec![record_val]));
let expected_batch =
RecordBatch::try_new(expected_schema,
vec![expected_array]).unwrap();
- assert_eq!(batch, expected_batch, "Test '{}' failed",
test.name);
- assert_eq!(
- batch.schema().field(0).name(),
- "f2",
- "Test '{}' failed",
- test.name
- );
+ assert_eq!(batch, expected_batch, "Test '{}'", test.name);
}
(Err(e), Some(expected)) => {
assert!(
e.to_string().contains(expected),
- "Test '{}' failed: unexpected error message at
decode.\nExpected to contain: '{expected}'\nActual: '{e}'",
- test.name,
+ "Test '{}' – expected error containing '{expected}',
got '{e}'",
+ test.name
);
}
- (Ok(batches), Some(expected)) => {
+ (Ok(_), Some(expected)) => {
panic!(
- "Test '{}' was expected to fail with '{expected}', but
it succeeded with: {:?}",
- test.name, batches
+ "Test '{}' expected failure ('{expected}') but
succeeded",
+ test.name
);
}
(Err(e), None) => {
- panic!(
- "Test '{}' was not expected to fail, but it did with
'{e}'",
- test.name
- );
+ panic!("Test '{}' unexpectedly failed with '{e}'",
test.name);
}
}
}