This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 4e18975 [feat] Support different header types (#190)
4e18975 is described below
commit 4e18975c3a6a1c224861f7667fd366c477ef7b5b
Author: Mason Gup <[email protected]>
AuthorDate: Mon Apr 28 17:02:14 2025 -0400
[feat] Support different header types (#190)
Issue #164
* Header trait and structs
* Tests and some cleanups
* Update names
* Add rustdocs
* Issue #190 - Minor improvements
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
* Fix for null schema
* Extract a constant for the GLUE_HEADER_LENGTH (== 18)
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
---------
Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
---
avro/src/error.rs | 5 +-
avro/src/headers.rs | 178 ++++++++++++++++++++++++++++++++++++++++++++++++++++
avro/src/lib.rs | 1 +
avro/src/reader.rs | 51 +++++++++------
avro/src/writer.rs | 64 ++++++++++++++-----
5 files changed, 263 insertions(+), 36 deletions(-)
diff --git a/avro/src/error.rs b/avro/src/error.rs
index 237995f..760f97d 100644
--- a/avro/src/error.rs
+++ b/avro/src/error.rs
@@ -410,7 +410,7 @@ pub enum Error {
HeaderMagic,
#[error("Message Header mismatch. Expected: {0:?}. Actual: {1:?}")]
- SingleObjectHeaderMismatch([u8; 10], [u8; 10]),
+ SingleObjectHeaderMismatch(Vec<u8>, Vec<u8>),
#[error("Failed to get JSON from avro.schema key in map")]
GetAvroSchemaFromMap,
@@ -513,6 +513,9 @@ pub enum Error {
#[error("Invalid Avro data! Cannot read codec type from value that is not
Value::Bytes.")]
BadCodecMetadata,
+
+ #[error("Cannot convert a slice to Uuid: {0}")]
+ UuidFromSlice(#[source] uuid::Error),
}
#[derive(thiserror::Error, PartialEq)]
diff --git a/avro/src/headers.rs b/avro/src/headers.rs
new file mode 100644
index 0000000..971b182
--- /dev/null
+++ b/avro/src/headers.rs
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Handling of Avro magic headers
+use uuid::Uuid;
+
+use crate::{rabin::Rabin, schema::SchemaFingerprint, AvroResult, Schema};
+
+/// This trait represents that an object is able to construct an Avro message
header. It is
+/// implemented for some known header types already. If you need a header type
that is not already
+/// included here, then you can create your own struct and implement this
trait.
+pub trait HeaderBuilder {
+ fn build_header(&self) -> Vec<u8>;
+}
+
+/// HeaderBuilder based on the Rabin schema fingerprint
+///
+/// This is the default and will be used automatically by the `new` impls in
+/// [crate::reader::GenericSingleObjectReader] and
[crate::writer::GenericSingleObjectWriter].
+pub struct RabinFingerprintHeader {
+ fingerprint: SchemaFingerprint,
+}
+
+impl RabinFingerprintHeader {
+ /// Use this helper to build an instance from an existing Avro `Schema`.
+ pub fn from_schema(schema: &Schema) -> Self {
+ let fingerprint = schema.fingerprint::<Rabin>();
+ RabinFingerprintHeader { fingerprint }
+ }
+}
+
+impl HeaderBuilder for RabinFingerprintHeader {
+ fn build_header(&self) -> Vec<u8> {
+ let bytes = &self.fingerprint.bytes;
+ vec![
+ 0xC3, 0x01, bytes[0], bytes[1], bytes[2], bytes[3], bytes[4],
bytes[5], bytes[6],
+ bytes[7],
+ ]
+ }
+}
+
+/// HeaderBuilder based on
+/// [Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)
schema UUID
+///
+/// See the function docs for usage details
+pub struct GlueSchemaUuidHeader {
+ schema_uuid: Uuid,
+}
+
+impl GlueSchemaUuidHeader {
+ /// Create an instance of the struct from a Glue Schema UUID
+ ///
+ /// Code for writing messages will most likely want to use this. You will
need to determine
+ /// via other means the correct Glue schema UUID and use it with this
method to be able to
+ /// create Avro-encoded messages with the correct headers.
+ pub fn from_uuid(schema_uuid: Uuid) -> Self {
+ GlueSchemaUuidHeader { schema_uuid }
+ }
+
+ /// The minimum length of a Glue header.
+ /// 2 bytes for the special prefix (3, 0) plus
+ /// 16 bytes for the Uuid
+ const GLUE_HEADER_LENGTH: usize = 18;
+
+ /// Create an instance of the struct based on parsing the UUID out of the
header of a raw
+ /// message
+ ///
+ /// Code for reading messages will most likely want to use this. Once you
receive the raw bytes
+ /// of a message, use this function to build the struct from it. That
struct can then be used
+ /// with the below `schema_uuid` function to retrieve the UUID in order to
retrieve the correct
+ /// schema for the message. You can then use the raw message, the schema,
and the struct
+ /// instance to read the message.
+ pub fn parse_from_raw_avro(message_payload: &[u8]) -> AvroResult<Self> {
+ if message_payload.len() < Self::GLUE_HEADER_LENGTH {
+ return Err(crate::error::Error::HeaderMagic);
+ }
+ let schema_uuid =
+
Uuid::from_slice(&message_payload[2..18]).map_err(crate::Error::UuidFromSlice)?;
+ Ok(GlueSchemaUuidHeader { schema_uuid })
+ }
+
+ /// Retrieve the UUID from the object
+ ///
+ /// This is most useful in conjunction with the `parse_from_raw_avro`
function to retrieve the
+ /// actual UUID from the raw data of a received message.
+ pub fn schema_uuid(&self) -> Uuid {
+ self.schema_uuid
+ }
+}
+
+impl HeaderBuilder for GlueSchemaUuidHeader {
+ fn build_header(&self) -> Vec<u8> {
+ let mut output_vec: Vec<u8> = vec![3, 0];
+ output_vec.extend_from_slice(self.schema_uuid.as_bytes());
+ output_vec
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use super::*;
+ use apache_avro_test_helper::TestResult;
+
+ #[test]
+ fn test_rabin_fingerprint_header() -> TestResult {
+ let schema_str = r#"
+ {
+ "type": "record",
+ "name": "test",
+ "fields": [
+ {
+ "name": "a",
+ "type": "long",
+ "default": 42
+ },
+ {
+ "name": "b",
+ "type": "string"
+ }
+ ]
+ }
+ "#;
+ let schema = Schema::parse_str(schema_str)?;
+ let header_builder = RabinFingerprintHeader::from_schema(&schema);
+ let computed_header = header_builder.build_header();
+ let expected_header: Vec<u8> = vec![195, 1, 232, 198, 194, 12, 97, 95,
44, 71];
+ assert_eq!(computed_header, expected_header);
+ Ok(())
+ }
+
+ #[test]
+ fn test_glue_schema_header() -> TestResult {
+ let schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+ let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+ let computed_header = header_builder.build_header();
+ let expected_header: Vec<u8> = vec![
+ 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72,
90, 95,
+ ];
+ assert_eq!(computed_header, expected_header);
+ Ok(())
+ }
+
+ #[test]
+ fn test_glue_header_parse() -> TestResult {
+ let incoming_avro_message: Vec<u8> = vec![
+ 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72,
90, 95, 65, 65, 65,
+ ];
+ let header_builder =
GlueSchemaUuidHeader::parse_from_raw_avro(&incoming_avro_message)?;
+ let expected_schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+ assert_eq!(header_builder.schema_uuid(), expected_schema_uuid);
+ Ok(())
+ }
+
+ #[test]
+ fn test_glue_header_parse_err_on_message_too_short() -> TestResult {
+ let incoming_message: Vec<u8> = vec![3, 0, 178, 241, 207, 0, 4, 52, 1];
+ let header_builder_res =
GlueSchemaUuidHeader::parse_from_raw_avro(&incoming_message);
+ assert!(matches!(
+ header_builder_res,
+ Err(crate::error::Error::HeaderMagic)
+ ));
+ Ok(())
+ }
+}
diff --git a/avro/src/lib.rs b/avro/src/lib.rs
index 247ab18..930541a 100644
--- a/avro/src/lib.rs
+++ b/avro/src/lib.rs
@@ -871,6 +871,7 @@ mod ser_schema;
mod util;
mod writer;
+pub mod headers;
pub mod rabin;
pub mod schema;
pub mod schema_compatibility;
diff --git a/avro/src/reader.rs b/avro/src/reader.rs
index 0830035..a70fffd 100644
--- a/avro/src/reader.rs
+++ b/avro/src/reader.rs
@@ -19,7 +19,7 @@
use crate::{
decode::{decode, decode_internal},
from_value,
- rabin::Rabin,
+ headers::{HeaderBuilder, RabinFingerprintHeader},
schema::{
resolve_names, resolve_names_with_schemata, AvroSchema, Names,
ResolvedOwnedSchema,
ResolvedSchema, Schema,
@@ -503,24 +503,20 @@ pub fn from_avro_datum_reader_schemata<R: Read>(
pub struct GenericSingleObjectReader {
write_schema: ResolvedOwnedSchema,
- expected_header: [u8; 10],
+ expected_header: Vec<u8>,
}
impl GenericSingleObjectReader {
pub fn new(schema: Schema) -> AvroResult<GenericSingleObjectReader> {
- let fingerprint = schema.fingerprint::<Rabin>();
- let expected_header = [
- 0xC3,
- 0x01,
- fingerprint.bytes[0],
- fingerprint.bytes[1],
- fingerprint.bytes[2],
- fingerprint.bytes[3],
- fingerprint.bytes[4],
- fingerprint.bytes[5],
- fingerprint.bytes[6],
- fingerprint.bytes[7],
- ];
+ let header_builder = RabinFingerprintHeader::from_schema(&schema);
+ Self::new_with_header_builder(schema, header_builder)
+ }
+
+ pub fn new_with_header_builder<HB: HeaderBuilder>(
+ schema: Schema,
+ header_builder: HB,
+ ) -> AvroResult<GenericSingleObjectReader> {
+ let expected_header = header_builder.build_header();
Ok(GenericSingleObjectReader {
write_schema: ResolvedOwnedSchema::try_from(schema)?,
expected_header,
@@ -528,7 +524,7 @@ impl GenericSingleObjectReader {
}
pub fn read_value<R: Read>(&self, reader: &mut R) -> AvroResult<Value> {
- let mut header: [u8; 10] = [0; 10];
+ let mut header = vec![0; self.expected_header.len()];
match reader.read_exact(&mut header) {
Ok(_) => {
if self.expected_header == header {
@@ -540,7 +536,7 @@ impl GenericSingleObjectReader {
)
} else {
Err(Error::SingleObjectHeaderMismatch(
- self.expected_header,
+ self.expected_header.clone(),
header,
))
}
@@ -602,11 +598,12 @@ pub fn read_marker(bytes: &[u8]) -> [u8; 16] {
#[cfg(test)]
mod tests {
use super::*;
- use crate::{encode::encode, types::Record};
+ use crate::{encode::encode, headers::GlueSchemaUuidHeader, rabin::Rabin,
types::Record};
use apache_avro_test_helper::TestResult;
use pretty_assertions::assert_eq;
use serde::Deserialize;
use std::io::Cursor;
+ use uuid::Uuid;
const SCHEMA: &str = r#"
{
@@ -1034,6 +1031,24 @@ mod tests {
Ok(())
}
+ #[test]
+ fn avro_rs_164_generic_reader_alternate_header() -> TestResult {
+ let schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+ let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+ let generic_reader =
GenericSingleObjectReader::new_with_header_builder(
+ TestSingleObjectReader::get_schema(),
+ header_builder,
+ )
+ .expect("failed to build reader");
+ let data_to_read: Vec<u8> = vec![
+ 3, 0, 178, 241, 207, 0, 4, 52, 1, 62, 67, 154, 18, 94, 184, 72,
90, 95,
+ ];
+ let mut to_read = &data_to_read[..];
+ let read_result = generic_reader.read_value(&mut to_read);
+ matches!(read_result, Err(crate::Error::ReadBytes(_)));
+ Ok(())
+ }
+
#[cfg(not(feature = "snappy"))]
#[test]
fn test_avro_3549_read_not_enabled_codec() {
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index ed41f0e..7406e48 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -18,14 +18,14 @@
//! Logic handling writing in Avro format at user level.
use crate::{
encode::{encode, encode_internal, encode_to_vec},
- rabin::Rabin,
+ headers::{HeaderBuilder, RabinFingerprintHeader},
schema::{AvroSchema, Name, ResolvedOwnedSchema, ResolvedSchema, Schema},
ser_schema::SchemaAwareWriteSerializer,
types::Value,
AvroResult, Codec, Error,
};
use serde::Serialize;
-use std::{collections::HashMap, io::Write, marker::PhantomData};
+use std::{collections::HashMap, io::Write, marker::PhantomData,
ops::RangeInclusive};
const DEFAULT_BLOCK_SIZE: usize = 16000;
const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
@@ -488,20 +488,17 @@ impl GenericSingleObjectWriter {
schema: &Schema,
initial_buffer_cap: usize,
) -> AvroResult<GenericSingleObjectWriter> {
- let fingerprint = schema.fingerprint::<Rabin>();
+ let header_builder = RabinFingerprintHeader::from_schema(schema);
+ Self::new_with_capacity_and_header_builder(schema, initial_buffer_cap,
header_builder)
+ }
+
+ pub fn new_with_capacity_and_header_builder<HB: HeaderBuilder>(
+ schema: &Schema,
+ initial_buffer_cap: usize,
+ header_builder: HB,
+ ) -> AvroResult<GenericSingleObjectWriter> {
let mut buffer = Vec::with_capacity(initial_buffer_cap);
- let header = [
- 0xC3,
- 0x01,
- fingerprint.bytes[0],
- fingerprint.bytes[1],
- fingerprint.bytes[2],
- fingerprint.bytes[3],
- fingerprint.bytes[4],
- fingerprint.bytes[5],
- fingerprint.bytes[6],
- fingerprint.bytes[7],
- ];
+ let header = header_builder.build_header();
buffer.extend_from_slice(&header);
Ok(GenericSingleObjectWriter {
@@ -510,15 +507,18 @@ impl GenericSingleObjectWriter {
})
}
+ const HEADER_LENGTH_RANGE: RangeInclusive<usize> = 10_usize..=20_usize;
+
/// Write the referenced Value to the provided Write object. Returns a
result with the number of bytes written including the header
pub fn write_value_ref<W: Write>(&mut self, v: &Value, writer: &mut W) ->
AvroResult<usize> {
- if self.buffer.len() != 10 {
+ let original_length = self.buffer.len();
+ if !Self::HEADER_LENGTH_RANGE.contains(&original_length) {
Err(Error::IllegalSingleObjectWriterState)
} else {
write_value_ref_owned_resolved(&self.resolved, v, &mut
self.buffer)?;
writer.write_all(&self.buffer).map_err(Error::WriteBytes)?;
let len = self.buffer.len();
- self.buffer.truncate(10);
+ self.buffer.truncate(original_length);
Ok(len)
}
}
@@ -701,6 +701,8 @@ mod tests {
use crate::{
decimal::Decimal,
duration::{Days, Duration, Millis, Months},
+ headers::GlueSchemaUuidHeader,
+ rabin::Rabin,
schema::{DecimalSchema, FixedSchema, Name},
types::Record,
util::zig_i64,
@@ -708,6 +710,7 @@ mod tests {
};
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
+ use uuid::Uuid;
use crate::codec::DeflateSettings;
use apache_avro_test_helper::TestResult;
@@ -1384,6 +1387,33 @@ mod tests {
Ok(())
}
+ #[test]
+ fn test_single_object_writer_with_header_builder() -> TestResult {
+ let mut buf: Vec<u8> = Vec::new();
+ let obj = TestSingleObjectWriter {
+ a: 300,
+ b: 34.555,
+ c: vec!["cat".into(), "dog".into()],
+ };
+ let schema_uuid =
Uuid::parse_str("b2f1cf00-0434-013e-439a-125eb8485a5f")?;
+ let header_builder = GlueSchemaUuidHeader::from_uuid(schema_uuid);
+ let mut writer =
GenericSingleObjectWriter::new_with_capacity_and_header_builder(
+ &TestSingleObjectWriter::get_schema(),
+ 1024,
+ header_builder,
+ )
+ .expect("Should resolve schema");
+ let value = obj.into();
+ writer
+ .write_value_ref(&value, &mut buf)
+ .expect("Error serializing properly");
+
+ assert_eq!(buf[0], 0x03);
+ assert_eq!(buf[1], 0x00);
+ assert_eq!(buf[2..18], schema_uuid.into_bytes()[..]);
+ Ok(())
+ }
+
#[test]
fn test_writer_parity() -> TestResult {
let obj1 = TestSingleObjectWriter {