This is an automated email from the ASF dual-hosted git repository.
mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 348ae916ce Add support for 64-bit Schema Registry IDs (Id64) in
arrow-avro (#8575)
348ae916ce is described below
commit 348ae916cef930dbae7143f2b0cbb362eb7abeef
Author: Connor Sanders <[email protected]>
AuthorDate: Fri Oct 10 02:32:43 2025 -0500
Add support for 64-bit Schema Registry IDs (Id64) in arrow-avro (#8575)
# Which issue does this PR close?
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax.
- Part of #4886
# Rationale for this change
Many Kafka deployments use **Confluent Schema Registry** (4‑byte,
big‑endian ID) and/or **Apicurio Registry** (commonly 8‑byte, big‑endian
*global* ID). `arrow-avro` already supported the Confluent wire format;
this PR adds first‑class support for Apicurio’s 64‑bit ID, enabling
seamless decode/encode of streams that carry an 8‑byte ID after the
magic byte.
This improves interoperability with ecosystems that standardize on
Apicurio or Red Hat Event Streams.
# What changes are included in this PR?
* **New 64‑bit ID support**
* Add `Fingerprint::Id64(u64)` and `FingerprintAlgorithm::Id64`.
* Add `FingerprintStrategy::Id64(u64)` and helper
`Fingerprint::load_fingerprint_id64`.
* Extend `Fingerprint::serialized_prefix` to emit/read an 8‑byte
big‑endian ID after the `0x00` magic byte.
* **Clarify/algin numeric‑ID algorithm names**
* Replace the prior `FingerprintAlgorithm::None` (numeric ID) with
`FingerprintAlgorithm::Id` (4‑byte) and introduce `Id64` (8‑byte). All
examples and call sites updated accordingly (i.e.,
`SchemaStore::new_with_type(FingerprintAlgorithm::Id)`).
* **Reader/Writer plumbing**
* `Decoder` now understands both `Id` (4‑byte) and `Id64` (8‑byte)
prefixes.
* `WriterBuilder` accepts `FingerprintStrategy::Id64` to write frames
with a 64‑bit ID.
* **SchemaStore behavior**
* `SchemaStore::register` now errors for `Id`/`Id64` algorithms (as
those IDs come from a registry); callers should use
`set(Fingerprint::Id(_)|Id64(_), ...)` to associate schemas by registry
ID.
* **Docs & examples**
* Reader docs expanded to call out Confluent (4‑byte) and Apicurio
(8‑byte) formats; examples switched to `FingerprintAlgorithm::Id`. Bench
and example updates reflect the new variants.
# Are these changes tested?
Yes. This PR adds/updates unit tests that exercise the new path
end‑to‑end, including:
* `test_stream_writer_with_id64_fingerprint_rt` (writer round‑trip with
64‑bit ID).
* `test_two_messages_same_schema_id64` (decoder round‑trip with 64‑bit
ID).
* Adjustments to existing tests and benches to use
`FingerprintAlgorithm::Id` instead of `None`.
# Are there any user-facing changes?
N/A because `arrow-avro` isn't public yet.
---------
Co-authored-by: Matthijs Brobbel <[email protected]>
---
arrow-avro/benches/decoder.rs | 8 ++-
arrow-avro/examples/decode_kafka_stream.rs | 4 +-
arrow-avro/src/lib.rs | 2 +-
arrow-avro/src/reader/mod.rs | 82 ++++++++++++++++++++++----
arrow-avro/src/schema.rs | 92 +++++++++++++++++++++++++-----
arrow-avro/src/writer/mod.rs | 37 +++++++++++-
6 files changed, 195 insertions(+), 30 deletions(-)
diff --git a/arrow-avro/benches/decoder.rs b/arrow-avro/benches/decoder.rs
index 5db99b4db7..7180826b7b 100644
--- a/arrow-avro/benches/decoder.rs
+++ b/arrow-avro/benches/decoder.rs
@@ -48,6 +48,12 @@ fn make_prefix(fp: Fingerprint) -> Vec<u8> {
buf.extend_from_slice(&id.to_be_bytes()); // big-endian
buf
}
+ Fingerprint::Id64(id) => {
+ let mut buf = Vec::with_capacity(CONFLUENT_MAGIC.len() +
size_of::<u64>());
+ buf.extend_from_slice(&CONFLUENT_MAGIC); // 00
+ buf.extend_from_slice(&id.to_be_bytes()); // big-endian
+ buf
+ }
#[cfg(feature = "md5")]
Fingerprint::MD5(val) => {
let mut buf = Vec::with_capacity(SINGLE_OBJECT_MAGIC.len() +
size_of_val(&val));
@@ -366,7 +372,7 @@ fn new_decoder_id(
id: u32,
) -> arrow_avro::reader::Decoder {
let schema = AvroSchema::new(schema_json.parse().unwrap());
- let mut store =
arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::None);
+ let mut store =
arrow_avro::schema::SchemaStore::new_with_type(FingerprintAlgorithm::Id);
// Register the schema with a provided Confluent-style ID
store
.set(Fingerprint::Id(id), schema.clone())
diff --git a/arrow-avro/examples/decode_kafka_stream.rs
b/arrow-avro/examples/decode_kafka_stream.rs
index 84ea3d3931..46309ecd0c 100644
--- a/arrow-avro/examples/decode_kafka_stream.rs
+++ b/arrow-avro/examples/decode_kafka_stream.rs
@@ -171,8 +171,8 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
let id_v0: u32 = 0;
let id_v1: u32 = 1;
- // Confluent SchemaStore keyed by integer IDs (FingerprintAlgorithm::None)
- let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+ // Confluent SchemaStore keyed by integer IDs (FingerprintAlgorithm::Id)
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs
index 57215b55cf..588334bab7 100644
--- a/arrow-avro/src/lib.rs
+++ b/arrow-avro/src/lib.rs
@@ -95,7 +95,7 @@
//! "fields":[{"name":"id","type":"long"},{"name":"name","type":"string"}]
//! }"#;
//!
-//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
//! let id: u32 = 1;
//! store.set(Fingerprint::Id(id), AvroSchema::new(avro_json.to_string()))?;
//!
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 2107156e7a..cd1833d35c 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -60,9 +60,14 @@
//!
<https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
//! * **Confluent Schema Registry wire format**: A 1‑byte magic `0x00`, a
**4‑byte big‑endian**
//! schema ID, then the Avro‑encoded body. Use `Decoder` with a
`SchemaStore` configured
-//! for `FingerprintAlgorithm::None` and entries keyed by `Fingerprint::Id`.
See
+//! for `FingerprintAlgorithm::Id` and entries keyed by `Fingerprint::Id`.
See
//! Confluent’s “Wire format” documentation.
//!
<https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
+//! * **Apicurio Schema Registry wire format**: A 1‑byte magic `0x00`, a
**8‑byte big‑endian**
+//! global schema ID, then the Avro‑encoded body. Use `Decoder` with a
`SchemaStore` configured
+//! for `FingerprintAlgorithm::Id64` and entries keyed by
`Fingerprint::Id64`. See
+//! Apicurio’s “Avro SerDe” documentation.
+//!
<https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
//!
//! ## Basic file usage (OCF)
//!
@@ -99,7 +104,7 @@
//! # Ok(()) }
//! ```
//!
-//! ## Streaming usage (single‑object / Confluent)
+//! ## Streaming usage (single‑object / Confluent / Apicurio)
//!
//! The `Decoder` lets you integrate Avro decoding with **any** source of
bytes by
//! periodically calling `Decoder::decode` with new data and calling
`Decoder::flush`
@@ -220,7 +225,7 @@
//!
//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
//! // Set up a store keyed by numeric IDs (Confluent).
-//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
//! let schema_id = 7u32;
//! let avro_schema =
AvroSchema::new(r#"{"type":"record","name":"User","fields":[
//! {"name":"id","type":"long"},
{"name":"name","type":"string"}]}"#.to_string());
@@ -380,7 +385,7 @@
//! let id_v0: u32 = 0;
//! let id_v1: u32 = 1;
//!
-//! let mut store =
SchemaStore::new_with_type(FingerprintAlgorithm::None); // integer IDs
+//! let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
// integer IDs
//! store.set(Fingerprint::Id(id_v0), writer_v0.clone())?;
//! store.set(Fingerprint::Id(id_v1), writer_v1.clone())?;
//!
@@ -591,7 +596,7 @@ fn is_incomplete_data(err: &ArrowError) -> bool {
/// use arrow_avro::reader::ReaderBuilder;
///
/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
-/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
/// store.set(Fingerprint::Id(1234),
AvroSchema::new(r#"{"type":"record","name":"E","fields":[{"name":"x","type":"long"}]}"#.to_string()))?;
///
/// // --- Hidden: encode two Confluent-framed messages {x:1} and {x:2} ---
@@ -713,9 +718,12 @@ impl Decoder {
Fingerprint::Rabin(u64::from_le_bytes(bytes))
})
}
- FingerprintAlgorithm::None => {
+ FingerprintAlgorithm::Id => self.handle_prefix_common(buf,
&CONFLUENT_MAGIC, |bytes| {
+ Fingerprint::Id(u32::from_be_bytes(bytes))
+ }),
+ FingerprintAlgorithm::Id64 => {
self.handle_prefix_common(buf, &CONFLUENT_MAGIC, |bytes| {
- Fingerprint::Id(u32::from_be_bytes(bytes))
+ Fingerprint::Id64(u64::from_be_bytes(bytes))
})
}
#[cfg(feature = "md5")]
@@ -909,7 +917,7 @@ impl Decoder {
/// use arrow_avro::schema::{AvroSchema, SchemaStore, Fingerprint,
FingerprintAlgorithm};
/// use arrow_avro::reader::ReaderBuilder;
///
-/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+/// let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
/// store.set(Fingerprint::Id(1234),
AvroSchema::new(r#"{"type":"record","name":"E","fields":[]}"#.to_string()))?;
///
/// let decoder = ReaderBuilder::new()
@@ -1409,6 +1417,9 @@ mod test {
Fingerprint::Id(v) => {
panic!("make_prefix expects a Rabin fingerprint, got ({v})");
}
+ Fingerprint::Id64(v) => {
+ panic!("make_prefix expects a Rabin fingerprint, got ({v})");
+ }
#[cfg(feature = "md5")]
Fingerprint::MD5(v) => {
panic!("make_prefix expects a Rabin fingerprint, got ({v:?})");
@@ -1445,6 +1456,21 @@ mod test {
msg
}
+ fn make_id64_prefix(id: u64, additional: usize) -> Vec<u8> {
+ let capacity = CONFLUENT_MAGIC.len() + size_of::<u64>() + additional;
+ let mut out = Vec::with_capacity(capacity);
+ out.extend_from_slice(&CONFLUENT_MAGIC);
+ out.extend_from_slice(&id.to_be_bytes());
+ out
+ }
+
+ fn make_message_id64(id: u64, value: i64) -> Vec<u8> {
+ let encoded_value = encode_zigzag(value);
+ let mut msg = make_id64_prefix(id, encoded_value.len());
+ msg.extend_from_slice(&encoded_value);
+ msg
+ }
+
fn make_value_schema(pt: PrimitiveType) -> AvroSchema {
let json_schema = format!(
r#"{{"type":"record","name":"S","fields":[{{"name":"v","type":"{}"}}]}}"#,
@@ -2159,6 +2185,7 @@ mod test {
let long_bytes = match fp_long {
Fingerprint::Rabin(v) => v.to_le_bytes(),
Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got
({id})"),
+ Fingerprint::Id64(id) => panic!("expected Rabin fingerprint, got
({id})"),
#[cfg(feature = "md5")]
Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got
({v:?})"),
#[cfg(feature = "sha256")]
@@ -2183,6 +2210,7 @@ mod test {
match fp_long {
Fingerprint::Rabin(v) => buf.extend_from_slice(&v.to_le_bytes()),
Fingerprint::Id(id) => panic!("expected Rabin fingerprint, got
({id})"),
+ Fingerprint::Id64(id) => panic!("expected Rabin fingerprint, got
({id})"),
#[cfg(feature = "md5")]
Fingerprint::MD5(v) => panic!("expected Rabin fingerprint, got
({v:?})"),
#[cfg(feature = "sha256")]
@@ -2269,7 +2297,7 @@ mod test {
let reader_schema = writer_schema.clone();
let id = 100u32;
// Set up store with None fingerprint algorithm and register schema by
id
- let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let _ = store
.set(Fingerprint::Id(id), writer_schema.clone())
.expect("set id schema");
@@ -2300,7 +2328,7 @@ mod test {
let writer_schema = make_value_schema(PrimitiveType::Int);
let id_known = 7u32;
let id_unknown = 9u32;
- let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let _ = store
.set(Fingerprint::Id(id_known), writer_schema.clone())
.expect("set id schema");
@@ -2324,7 +2352,7 @@ mod test {
fn test_handle_prefix_id_incomplete_magic() {
let writer_schema = make_value_schema(PrimitiveType::Int);
let id = 5u32;
- let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let _ = store
.set(Fingerprint::Id(id), writer_schema.clone())
.expect("set id schema");
@@ -2341,6 +2369,38 @@ mod test {
assert!(decoder.pending_schema.is_none());
}
+ #[test]
+ fn test_two_messages_same_schema_id64() {
+ let writer_schema = make_value_schema(PrimitiveType::Int);
+ let reader_schema = writer_schema.clone();
+ let id = 100u64;
+ // Set up store with None fingerprint algorithm and register schema by
id
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
+ let _ = store
+ .set(Fingerprint::Id64(id), writer_schema.clone())
+ .expect("set id schema");
+ let msg1 = make_message_id64(id, 21);
+ let msg2 = make_message_id64(id, 22);
+ let input = [msg1.clone(), msg2.clone()].concat();
+ let mut decoder = ReaderBuilder::new()
+ .with_batch_size(8)
+ .with_reader_schema(reader_schema)
+ .with_writer_schema_store(store)
+ .with_active_fingerprint(Fingerprint::Id64(id))
+ .build_decoder()
+ .unwrap();
+ let _ = decoder.decode(&input).unwrap();
+ let batch = decoder.flush().unwrap().expect("batch");
+ assert_eq!(batch.num_rows(), 2);
+ let col = batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(col.value(0), 21);
+ assert_eq!(col.value(1), 22);
+ }
+
#[test]
fn test_decode_stream_with_schema() {
struct TestCase<'a> {
diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs
index b3081bbd09..cce9a7d1f8 100644
--- a/arrow-avro/src/schema.rs
+++ b/arrow-avro/src/schema.rs
@@ -365,16 +365,16 @@ impl AvroSchema {
/// - `Fingerprint::MD5` for `FingerprintAlgorithm::MD5`
/// - `Fingerprint::SHA256` for `FingerprintAlgorithm::SHA256`
///
- /// Note: [`FingerprintAlgorithm::None`] cannot be used to generate a
fingerprint
+ /// Note: [`FingerprintAlgorithm::Id`] or [`FingerprintAlgorithm::Id64`]
cannot be used to generate a fingerprint
/// and will result in an error. If you intend to use a Schema Registry
ID-based
- /// wire format, load or set the [`Fingerprint::Id`] directly via
[`Fingerprint::load_fingerprint_id`]
- /// or [`SchemaStore::set`].
+ /// wire format, either use [`SchemaStore::set`] or load the
[`Fingerprint::Id`] directly via [`Fingerprint::load_fingerprint_id`] or for
+ /// [`Fingerprint::Id64`] via [`Fingerprint::load_fingerprint_id64`].
///
/// See also:
<https://avro.apache.org/docs/1.11.1/specification/#schema-fingerprints>
///
/// # Errors
/// Returns an error if deserializing the schema fails, if generating the
- /// canonical form of the schema fails, or if `hash_type` is
[`FingerprintAlgorithm::None`].
+ /// canonical form of the schema fails, or if `hash_type` is
[`FingerprintAlgorithm::Id`].
///
/// # Examples
/// ```
@@ -398,8 +398,8 @@ impl AvroSchema {
FingerprintAlgorithm::Rabin => {
Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical)))
}
- FingerprintAlgorithm::None => Err(ArrowError::SchemaError(
- "FingerprintAlgorithm of None cannot be used to generate a
fingerprint; \
+ FingerprintAlgorithm::Id | FingerprintAlgorithm::Id64 =>
Err(ArrowError::SchemaError(
+ "FingerprintAlgorithm of Id or Id64 cannot be used to generate
a fingerprint; \
if using Fingerprint::Id, pass the registry ID in instead
using the set method."
.to_string(),
)),
@@ -492,6 +492,8 @@ pub enum FingerprintStrategy {
Rabin,
/// Use a Confluent Schema Registry 32-bit ID.
Id(u32),
+ /// Use an Apicurio Schema Registry 64-bit ID.
+ Id64(u64),
#[cfg(feature = "md5")]
/// Use the 128-bit MD5 fingerprint.
MD5,
@@ -510,7 +512,8 @@ impl From<FingerprintAlgorithm> for FingerprintStrategy {
fn from(f: FingerprintAlgorithm) -> Self {
match f {
FingerprintAlgorithm::Rabin => FingerprintStrategy::Rabin,
- FingerprintAlgorithm::None => FingerprintStrategy::Id(0),
+ FingerprintAlgorithm::Id => FingerprintStrategy::Id(0),
+ FingerprintAlgorithm::Id64 => FingerprintStrategy::Id64(0),
#[cfg(feature = "md5")]
FingerprintAlgorithm::MD5 => FingerprintStrategy::MD5,
#[cfg(feature = "sha256")]
@@ -524,6 +527,7 @@ impl From<&Fingerprint> for FingerprintStrategy {
match f {
Fingerprint::Rabin(_) => FingerprintStrategy::Rabin,
Fingerprint::Id(_) => FingerprintStrategy::Id(0),
+ Fingerprint::Id64(_) => FingerprintStrategy::Id64(0),
#[cfg(feature = "md5")]
Fingerprint::MD5(_) => FingerprintStrategy::MD5,
#[cfg(feature = "sha256")]
@@ -539,8 +543,10 @@ pub enum FingerprintAlgorithm {
/// 64‑bit CRC‑64‑AVRO Rabin fingerprint.
#[default]
Rabin,
- /// Represents a fingerprint not based on a hash algorithm, (e.g., a
32-bit Schema Registry ID.)
- None,
+ /// Represents a 32 bit fingerprint not based on a hash algorithm, (e.g.,
a 32-bit Schema Registry ID.)
+ Id,
+ /// Represents a 64 bit fingerprint not based on a hash algorithm, (e.g.,
a 64-bit Schema Registry ID.)
+ Id64,
#[cfg(feature = "md5")]
/// 128-bit MD5 message digest.
MD5,
@@ -554,7 +560,8 @@ impl From<&Fingerprint> for FingerprintAlgorithm {
fn from(fp: &Fingerprint) -> Self {
match fp {
Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin,
- Fingerprint::Id(_) => FingerprintAlgorithm::None,
+ Fingerprint::Id(_) => FingerprintAlgorithm::Id,
+ Fingerprint::Id64(_) => FingerprintAlgorithm::Id64,
#[cfg(feature = "md5")]
Fingerprint::MD5(_) => FingerprintAlgorithm::MD5,
#[cfg(feature = "sha256")]
@@ -573,7 +580,8 @@ impl From<&FingerprintStrategy> for FingerprintAlgorithm {
fn from(s: &FingerprintStrategy) -> Self {
match s {
FingerprintStrategy::Rabin => FingerprintAlgorithm::Rabin,
- FingerprintStrategy::Id(_) => FingerprintAlgorithm::None,
+ FingerprintStrategy::Id(_) => FingerprintAlgorithm::Id,
+ FingerprintStrategy::Id64(_) => FingerprintAlgorithm::Id64,
#[cfg(feature = "md5")]
FingerprintStrategy::MD5 => FingerprintAlgorithm::MD5,
#[cfg(feature = "sha256")]
@@ -596,6 +604,8 @@ pub enum Fingerprint {
Rabin(u64),
/// A 32-bit Schema Registry ID.
Id(u32),
+ /// A 64-bit Schema Registry ID.
+ Id64(u64),
#[cfg(feature = "md5")]
/// A 128-bit MD5 fingerprint.
MD5([u8; 16]),
@@ -615,6 +625,7 @@ impl From<&FingerprintStrategy> for Fingerprint {
match s {
FingerprintStrategy::Rabin => Fingerprint::Rabin(0),
FingerprintStrategy::Id(id) => Fingerprint::Id(*id),
+ FingerprintStrategy::Id64(id) => Fingerprint::Id64(*id),
#[cfg(feature = "md5")]
FingerprintStrategy::MD5 => Fingerprint::MD5([0; 16]),
#[cfg(feature = "sha256")]
@@ -627,7 +638,8 @@ impl From<FingerprintAlgorithm> for Fingerprint {
fn from(s: FingerprintAlgorithm) -> Self {
match s {
FingerprintAlgorithm::Rabin => Fingerprint::Rabin(0),
- FingerprintAlgorithm::None => Fingerprint::Id(0),
+ FingerprintAlgorithm::Id => Fingerprint::Id(0),
+ FingerprintAlgorithm::Id64 => Fingerprint::Id64(0),
#[cfg(feature = "md5")]
FingerprintAlgorithm::MD5 => Fingerprint::MD5([0; 16]),
#[cfg(feature = "sha256")]
@@ -648,11 +660,24 @@ impl Fingerprint {
Fingerprint::Id(u32::from_be(id))
}
+ /// Loads the 64-bit Schema Registry fingerprint (Apicurio Schema Registry
ID).
+ ///
+ /// The provided `id` is in big-endian wire order; this converts it to
host order
+ /// and returns `Fingerprint::Id64`.
+ ///
+ /// # Returns
+ /// A `Fingerprint::Id64` variant containing the 64-bit fingerprint.
+ pub fn load_fingerprint_id64(id: u64) -> Self {
+ Fingerprint::Id64(u64::from_be(id))
+ }
+
/// Constructs a serialized prefix represented as a `Vec<u8>` based on the
variant of the enum.
///
/// This method serializes data in different formats depending on the
variant of `self`:
/// - **`Id(id)`**: Uses the Confluent wire format, which includes a
predefined magic header (`CONFLUENT_MAGIC`)
/// followed by the big-endian byte representation of the `id`.
+ /// - **`Id64(id)`**: Uses the Apicurio wire format, which includes a
predefined magic header (`CONFLUENT_MAGIC`)
+ /// followed by the big-endian 8-byte representation of the `id`.
/// - **`Rabin(val)`**: Uses the Avro single-object specification format.
This includes a different magic header
/// (`SINGLE_OBJECT_MAGIC`) followed by the little-endian byte
representation of the `val`.
/// - **`MD5(bytes)`** (optional, `md5` feature enabled): A non-standard
extension that adds the
@@ -673,6 +698,7 @@ impl Fingerprint {
let mut buf = [0u8; MAX_PREFIX_LEN];
let len = match self {
Self::Id(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC,
&val.to_be_bytes()),
+ Self::Id64(val) => write_prefix(&mut buf, &CONFLUENT_MAGIC,
&val.to_be_bytes()),
Self::Rabin(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC,
&val.to_le_bytes()),
#[cfg(feature = "md5")]
Self::MD5(val) => write_prefix(&mut buf, &SINGLE_OBJECT_MAGIC,
val),
@@ -798,7 +824,7 @@ impl SchemaStore {
/// A fingerprint is calculated for the given schema using the store's
configured
/// hash type. If a schema with the same fingerprint does not already
exist in the
/// store, the new schema is inserted. If the fingerprint already exists,
the
- /// existing schema is not overwritten. If FingerprintAlgorithm is set to
None, this
+ /// existing schema is not overwritten. If FingerprintAlgorithm is set to
Id or Id64, this
/// method will return an error. Confluent wire format implementations
should leverage the
/// set method instead.
///
@@ -811,7 +837,9 @@ impl SchemaStore {
/// A `Result` containing the `Fingerprint` of the schema if successful,
/// or an `ArrowError` on failure.
pub fn register(&mut self, schema: AvroSchema) -> Result<Fingerprint,
ArrowError> {
- if self.fingerprint_algorithm == FingerprintAlgorithm::None {
+ if self.fingerprint_algorithm == FingerprintAlgorithm::Id
+ || self.fingerprint_algorithm == FingerprintAlgorithm::Id64
+ {
return Err(ArrowError::SchemaError(
"Invalid FingerprintAlgorithm; unable to generate fingerprint.
\
Use the set method directly instead, providing a valid fingerprint"
@@ -2157,6 +2185,9 @@ mod tests {
Fingerprint::Id(_id) => {
unreachable!("This test should only generate Rabin
fingerprints")
}
+ Fingerprint::Id64(_id) => {
+ unreachable!("This test should only generate Rabin
fingerprints")
+ }
#[cfg(feature = "md5")]
Fingerprint::MD5(_id) => {
unreachable!("This test should only generate Rabin
fingerprints")
@@ -2180,6 +2211,39 @@ mod tests {
assert!(store.lookup(&Fingerprint::Id(id.wrapping_add(1))).is_none());
}
+ #[test]
+ fn test_set_and_lookup_id64() {
+ let mut store = SchemaStore::new();
+ let schema =
AvroSchema::new(serde_json::to_string(&int_schema()).unwrap());
+ let id64: u64 = 0xDEAD_BEEF_DEAD_BEEF;
+ let fp = Fingerprint::Id64(id64);
+ let out_fp = store.set(fp, schema.clone()).unwrap();
+ assert_eq!(out_fp, fp, "set should return the same Id64 fingerprint");
+ assert_eq!(
+ store.lookup(&fp).cloned(),
+ Some(schema.clone()),
+ "lookup should find the schema by Id64"
+ );
+ assert!(
+ store
+ .lookup(&Fingerprint::Id64(id64.wrapping_add(1)))
+ .is_none(),
+ "lookup with a different Id64 must return None"
+ );
+ }
+
+ #[test]
+ fn test_fingerprint_id64_conversions() {
+ let algo_from_fp = FingerprintAlgorithm::from(&Fingerprint::Id64(123));
+ assert_eq!(algo_from_fp, FingerprintAlgorithm::Id64);
+ let fp_from_algo = Fingerprint::from(FingerprintAlgorithm::Id64);
+ assert!(matches!(fp_from_algo, Fingerprint::Id64(0)));
+ let strategy_from_fp = FingerprintStrategy::from(Fingerprint::Id64(5));
+ assert!(matches!(strategy_from_fp, FingerprintStrategy::Id64(0)));
+ let algo_from_strategy = FingerprintAlgorithm::from(strategy_from_fp);
+ assert_eq!(algo_from_strategy, FingerprintAlgorithm::Id64);
+ }
+
#[test]
fn test_register_duplicate_schema() {
let mut store = SchemaStore::new();
diff --git a/arrow-avro/src/writer/mod.rs b/arrow-avro/src/writer/mod.rs
index 2e89312cc2..6d5ace2230 100644
--- a/arrow-avro/src/writer/mod.rs
+++ b/arrow-avro/src/writer/mod.rs
@@ -133,6 +133,7 @@ impl WriterBuilder {
let maybe_fingerprint = if F::NEEDS_PREFIX {
match self.fingerprint_strategy {
Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
+ Some(FingerprintStrategy::Id64(id)) =>
Some(Fingerprint::Id64(id)),
Some(strategy) => {
Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?)
}
@@ -488,7 +489,7 @@ mod tests {
.build::<_, AvroBinaryFormat>(Vec::new())?;
writer.write(&batch)?;
let encoded = writer.into_inner();
- let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::None);
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
let avro_schema = AvroSchema::try_from(&schema)?;
let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
let mut decoder = ReaderBuilder::new()
@@ -509,6 +510,40 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), ArrowError>
{
+ let schema = Schema::new(vec![Field::new("a", DataType::Int32,
false)]);
+ let batch = RecordBatch::try_new(
+ Arc::new(schema.clone()),
+ vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
+ )?;
+ let schema_id: u64 = 42;
+ let mut writer = WriterBuilder::new(schema.clone())
+ .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
+ .build::<_, AvroBinaryFormat>(Vec::new())?;
+ writer.write(&batch)?;
+ let encoded = writer.into_inner();
+ let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
+ let avro_schema = AvroSchema::try_from(&schema)?;
+ let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
+ let mut decoder = ReaderBuilder::new()
+ .with_writer_schema_store(store)
+ .build_decoder()?;
+ let _ = decoder.decode(&encoded)?;
+ let decoded = decoder
+ .flush()?
+ .expect("expected at least one batch from decoder");
+ assert_eq!(decoded.num_columns(), 1);
+ assert_eq!(decoded.num_rows(), 3);
+ let col = decoded
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .expect("int column");
+ assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
+ Ok(())
+ }
+
#[test]
fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> {
let batch = make_batch();