This is an automated email from the ASF dual-hosted git repository.
kriskras99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 086bc67 feat: Duration inner fixed schema and serialization (#382)
086bc67 is described below
commit 086bc6701b5c2ca49cfdd5068ac0ed9a1757ea93
Author: jdarais <[email protected]>
AuthorDate: Wed Jan 7 02:16:50 2026 -0800
feat: Duration inner fixed schema and serialization (#382)
* Add Duration to Deserializer, Add FixedSchema to Schema::Duration
* Deprecate SchemaKind::is_named in favor of Schema::is_named
* formatting
* fmt and clippy fixes
* test names and DurationVisitor cleanup, fix Deserializer
deserialize_bytes for Duration
* fix duratin schema serialization and test names
* add check for size in duration schema in decode_internal
* PR feedback fixes
* AvroSchemaComponent for core::time::Duration, plus minor cleanup
* add missing cases for Uuid,Decimal,Duration for deserialize_byte_buf
---------
Co-authored-by: Jeremiah Darais <[email protected]>
---
avro/src/decode.rs | 16 +++--
avro/src/duration.rs | 100 +++++++++++++++++++++++++++++--
avro/src/error.rs | 2 +-
avro/src/schema.rs | 123 ++++++++++++++++++++++++++++++++-------
avro/src/schema_compatibility.rs | 2 +-
avro/src/schema_equality.rs | 112 ++++++++++++++++++++++++++++++++++-
avro/src/serde/de.rs | 32 +++++++---
avro/src/serde/ser_schema.rs | 4 +-
avro/src/types.rs | 55 ++++++++++++++---
avro/src/writer.rs | 9 ++-
10 files changed, 400 insertions(+), 55 deletions(-)
diff --git a/avro/src/decode.rs b/avro/src/decode.rs
index 06c59e6..b5b828c 100644
--- a/avro/src/decode.rs
+++ b/avro/src/decode.rs
@@ -179,10 +179,18 @@ pub(crate) fn decode_internal<R: Read, S: Borrow<Schema>>(
Schema::LocalTimestampMillis =>
zag_i64(reader).map(Value::LocalTimestampMillis),
Schema::LocalTimestampMicros =>
zag_i64(reader).map(Value::LocalTimestampMicros),
Schema::LocalTimestampNanos =>
zag_i64(reader).map(Value::LocalTimestampNanos),
- Schema::Duration => {
- let mut buf = [0u8; 12];
- reader.read_exact(&mut buf).map_err(Details::ReadDuration)?;
- Ok(Value::Duration(Duration::from(buf)))
+ Schema::Duration(fixed_schema) => {
+ if fixed_schema.size == 12 {
+ let mut buf = [0u8; 12];
+ reader.read_exact(&mut buf).map_err(Details::ReadDuration)?;
+ Ok(Value::Duration(Duration::from(buf)))
+ } else {
+ Err(Details::CompareFixedSizes {
+ size: 12,
+ n: fixed_schema.size,
+ }
+ .into())
+ }
}
Schema::Float => {
let mut buf = [0u8; std::mem::size_of::<f32>()];
diff --git a/avro/src/duration.rs b/avro/src/duration.rs
index 713c9f8..ebaa469 100644
--- a/avro/src/duration.rs
+++ b/avro/src/duration.rs
@@ -17,6 +17,8 @@
/// A struct representing duration that hides the details of endianness and
conversion between
/// platform-native u32 and byte arrays.
+use serde::{Deserialize, Serialize, de};
+
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub struct Duration {
months: Months,
@@ -125,8 +127,8 @@ impl Duration {
}
}
-impl From<Duration> for [u8; 12] {
- fn from(duration: Duration) -> Self {
+impl From<&Duration> for [u8; 12] {
+ fn from(duration: &Duration) -> Self {
let mut bytes = [0u8; 12];
bytes[0..4].copy_from_slice(&duration.months.as_bytes());
bytes[4..8].copy_from_slice(&duration.days.as_bytes());
@@ -135,8 +137,14 @@ impl From<Duration> for [u8; 12] {
}
}
-impl From<[u8; 12]> for Duration {
- fn from(bytes: [u8; 12]) -> Self {
+impl From<Duration> for [u8; 12] {
+ fn from(duration: Duration) -> Self {
+ (&duration).into()
+ }
+}
+
+impl From<&[u8; 12]> for Duration {
+ fn from(bytes: &[u8; 12]) -> Self {
Self {
months: Months::from([bytes[0], bytes[1], bytes[2], bytes[3]]),
days: Days::from([bytes[4], bytes[5], bytes[6], bytes[7]]),
@@ -144,3 +152,87 @@ impl From<[u8; 12]> for Duration {
}
}
}
+
+impl From<[u8; 12]> for Duration {
+ fn from(bytes: [u8; 12]) -> Duration {
+ (&bytes).into()
+ }
+}
+
+impl Serialize for Duration {
+ fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+ where
+ S: serde::Serializer,
+ {
+ let value_bytes: [u8; 12] = self.into();
+ serializer.serialize_bytes(&value_bytes)
+ }
+}
+
+impl<'de> Deserialize<'de> for Duration {
+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
+ where
+ D: serde::Deserializer<'de>,
+ {
+ struct DurationVisitor;
+
+ impl de::Visitor<'_> for DurationVisitor {
+ type Value = Duration;
+
+ fn expecting(&self, f: &mut std::fmt::Formatter) -> Result<(),
std::fmt::Error> {
+ write!(f, "a byte array with size 12")
+ }
+
+ fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E>
+ where
+ E: de::Error,
+ {
+ if v.len() != 12 {
+ Err(E::custom(format!(
+ "Expected byte array of length 12, but length is {}",
+ v.len()
+ )))
+ } else {
+ let v_slice: [u8; 12] = v[..12].try_into().unwrap();
+ Ok(Duration::from(v_slice))
+ }
+ }
+ }
+
+ deserializer.deserialize_bytes(DurationVisitor)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::types::Value;
+ use anyhow::anyhow;
+ use apache_avro_test_helper::TestResult;
+
+ #[test]
+ fn avro_rs_382_duration_from_value() -> TestResult {
+ let val = Value::Duration(Duration::new(Months::new(7), Days::new(4),
Millis::new(45)));
+ let de_val: Duration = crate::from_value(&val)?;
+ assert_eq!(de_val.months(), Months::new(7));
+ assert_eq!(de_val.days(), Days::new(4));
+ assert_eq!(de_val.millis(), Millis::new(45));
+ Ok(())
+ }
+
+ #[test]
+ fn avro_rs_382_duration_to_value() -> TestResult {
+ let duration = Duration::new(Months::new(7), Days::new(4),
Millis::new(45));
+ let ser_val = crate::to_value(duration)?;
+ match ser_val {
+ // Without a schema, we expect Duration to serialize to bytes
+ Value::Bytes(b) => {
+ assert_eq!(b, vec![7, 0, 0, 0, 4, 0, 0, 0, 45, 0, 0, 0]);
+ }
+ _ => {
+ Err(anyhow!("Expected a Bytes value but got {:?}", ser_val))?;
+ }
+ }
+ Ok(())
+ }
+}
diff --git a/avro/src/error.rs b/avro/src/error.rs
index bdb2055..b2b0211 100644
--- a/avro/src/error.rs
+++ b/avro/src/error.rs
@@ -179,7 +179,7 @@ pub enum Details {
GetBigDecimal(Value),
#[error("Fixed bytes of size 12 expected, got Fixed of size {0}")]
- GetDecimalFixedBytes(usize),
+ GetDurationFixedBytes(usize),
#[error("Expected Value::Duration or Value::Fixed(12), got: {0:?}")]
ResolveDuration(Value),
diff --git a/avro/src/schema.rs b/avro/src/schema.rs
index a7ea493..78dcebd 100644
--- a/avro/src/schema.rs
+++ b/avro/src/schema.rs
@@ -134,7 +134,7 @@ pub enum Schema {
/// An instant in local time represented as the number of nanoseconds
after the UNIX epoch.
LocalTimestampNanos,
/// An amount of time defined by a number of months, days and milliseconds.
- Duration,
+ Duration(FixedSchema),
/// A reference to another schema.
Ref { name: Name },
}
@@ -176,10 +176,15 @@ impl SchemaKind {
)
}
+ #[deprecated(since = "0.22.0", note = "Use Schema::is_named instead")]
pub fn is_named(self) -> bool {
matches!(
self,
- SchemaKind::Record | SchemaKind::Enum | SchemaKind::Fixed |
SchemaKind::Ref
+ SchemaKind::Record
+ | SchemaKind::Enum
+ | SchemaKind::Fixed
+ | SchemaKind::Ref
+ | SchemaKind::Duration
)
}
}
@@ -493,7 +498,8 @@ impl<'s> ResolvedSchema<'s> {
| Schema::Decimal(DecimalSchema {
inner: InnerDecimalSchema::Fixed(FixedSchema { name, .. }),
..
- }) => {
+ })
+ | Schema::Duration(FixedSchema { name, .. }) => {
let fully_qualified_name =
name.fully_qualified_name(enclosing_namespace);
if self
.names_ref
@@ -979,8 +985,7 @@ impl UnionSchema {
if let Schema::Union(_) = schema {
return Err(Details::GetNestedUnion.into());
}
- let kind = SchemaKind::from(schema);
- if !kind.is_named() && vindex.insert(kind, i).is_some() {
+ if !schema.is_named() && vindex.insert(SchemaKind::from(schema),
i).is_some() {
return Err(Details::GetUnionDuplicate.into());
}
}
@@ -1267,10 +1272,28 @@ impl Schema {
..
})
| Schema::Uuid(UuidSchema::Fixed(FixedSchema { attributes, .. }))
=> Some(attributes),
+ Schema::Duration(FixedSchema { attributes, .. }) =>
Some(attributes),
_ => None,
}
}
+ /// Returns whether the schema represents a named type according to the
avro specification
+ pub fn is_named(&self) -> bool {
+ matches!(
+ self,
+ Schema::Ref { .. }
+ | Schema::Record(_)
+ | Schema::Enum(_)
+ | Schema::Fixed(_)
+ | Schema::Decimal(DecimalSchema {
+ inner: InnerDecimalSchema::Fixed(_),
+ ..
+ })
+ | Schema::Uuid(UuidSchema::Fixed(_))
+ | Schema::Duration(_)
+ )
+ }
+
/// Returns the name of the schema if it has one.
pub fn name(&self) -> Option<&Name> {
match self {
@@ -1282,7 +1305,8 @@ impl Schema {
inner: InnerDecimalSchema::Fixed(FixedSchema { name, .. }),
..
})
- | Schema::Uuid(UuidSchema::Fixed(FixedSchema { name, .. })) =>
Some(name),
+ | Schema::Uuid(UuidSchema::Fixed(FixedSchema { name, .. }))
+ | Schema::Duration(FixedSchema { name, .. }) => Some(name),
_ => None,
}
}
@@ -1303,6 +1327,7 @@ impl Schema {
..
})
| Schema::Uuid(UuidSchema::Fixed(FixedSchema { aliases, .. })) =>
aliases.as_ref(),
+ Schema::Duration(FixedSchema { aliases, .. }) => aliases.as_ref(),
_ => None,
}
}
@@ -1318,6 +1343,7 @@ impl Schema {
..
})
| Schema::Uuid(UuidSchema::Fixed(FixedSchema { doc, .. })) =>
doc.as_ref(),
+ Schema::Duration(FixedSchema { doc, .. }) => doc.as_ref(),
_ => None,
}
}
@@ -1748,7 +1774,25 @@ impl Parser {
"duration",
parse_as_native_complex(complex, self,
enclosing_namespace)?,
&[SchemaKind::Fixed],
- |_| -> AvroResult<Schema> { Ok(Schema::Duration) },
+ |schema| -> AvroResult<Schema> {
+ match schema {
+ Schema::Fixed(fixed @ FixedSchema { size: 12,
.. }) => {
+ Ok(Schema::Duration(fixed))
+ }
+ Schema::Fixed(FixedSchema { size, .. }) => {
+ warn!(
+ "Ignoring duration logical type on
fixed type because size ({size}) is not 12! Schema: {schema:?}"
+ );
+ Ok(schema)
+ }
+ _ => {
+ warn!(
+ "Ignoring invalid duration logical
type for schema: {schema:?}"
+ );
+ Ok(schema)
+ }
+ }
+ },
);
}
// In this case, of an unknown logical type, we just pass
through the underlying
@@ -2348,20 +2392,10 @@ impl Serialize for Schema {
map.serialize_entry("logicalType", "local-timestamp-nanos")?;
map.end()
}
- Schema::Duration => {
- let mut map = serializer.serialize_map(None)?;
+ Schema::Duration(fixed) => {
+ let map = serializer.serialize_map(None)?;
- // the Avro doesn't indicate what the name of the underlying
fixed type of a
- // duration should be or typically is.
- let inner = Schema::Fixed(FixedSchema {
- name: Name::new("duration").unwrap(),
- aliases: None,
- doc: None,
- size: 12,
- default: None,
- attributes: Default::default(),
- });
- map.serialize_entry("type", &inner)?;
+ let mut map = fixed.serialize_to_map::<S>(map)?;
map.serialize_entry("logicalType", "duration")?;
map.end()
}
@@ -2629,7 +2663,6 @@ pub mod derive {
impl_schema!(f64, Schema::Double);
impl_schema!(String, Schema::String);
impl_schema!(uuid::Uuid, Schema::Uuid(UuidSchema::String));
- impl_schema!(core::time::Duration, Schema::Duration);
impl<T> AvroSchemaComponent for Vec<T>
where
@@ -2722,6 +2755,29 @@ pub mod derive {
T::get_schema_in_ctxt(named_schemas, enclosing_namespace)
}
}
+
+ impl AvroSchemaComponent for core::time::Duration {
+ fn get_schema_in_ctxt(
+ named_schemas: &mut Names,
+ enclosing_namespace: &Namespace,
+ ) -> Schema {
+ let name = Name {
+ name: "duration".to_string(),
+ namespace: enclosing_namespace.clone(),
+ };
+ named_schemas
+ .entry(name.clone())
+ .or_insert(Schema::Duration(FixedSchema {
+ name,
+ aliases: None,
+ doc: None,
+ size: 12,
+ default: None,
+ attributes: Default::default(),
+ }))
+ .clone()
+ }
+ }
}
#[cfg(test)]
@@ -7470,4 +7526,29 @@ mod tests {
let _resolved = ResolvedSchema::try_from(&schema).unwrap();
let _resolved_owned = ResolvedOwnedSchema::try_from(schema).unwrap();
}
+
+ #[test]
+ fn avro_rs_382_serialize_duration_schema() -> TestResult {
+ let schema = Schema::Duration(FixedSchema {
+ name: Name::from("Duration"),
+ aliases: None,
+ doc: None,
+ size: 12,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+
+ let expected_schema_json = json!({
+ "type": "fixed",
+ "logicalType": "duration",
+ "name": "Duration",
+ "size": 12
+ });
+
+ let schema_json = serde_json::to_value(&schema)?;
+
+ assert_eq!(&schema_json, &expected_schema_json);
+
+ Ok(())
+ }
}
diff --git a/avro/src/schema_compatibility.rs b/avro/src/schema_compatibility.rs
index 2b21eee..560302b 100644
--- a/avro/src/schema_compatibility.rs
+++ b/avro/src/schema_compatibility.rs
@@ -493,7 +493,7 @@ impl SchemaCompatibility {
| Schema::LocalTimestampMillis
| Schema::LocalTimestampMicros
| Schema::LocalTimestampNanos
- | Schema::Duration
+ | Schema::Duration(_)
| Schema::Ref { .. } => {
unreachable!("SchemaKind::Uuid can only be a
Schema::Uuid")
}
diff --git a/avro/src/schema_equality.rs b/avro/src/schema_equality.rs
index b351584..1811ba5 100644
--- a/avro/src/schema_equality.rs
+++ b/avro/src/schema_equality.rs
@@ -88,8 +88,6 @@ impl SchemataEq for StructFieldEq {
(Schema::BigDecimal, _) => false,
(Schema::Date, Schema::Date) => true,
(Schema::Date, _) => false,
- (Schema::Duration, Schema::Duration) => true,
- (Schema::Duration, _) => false,
(Schema::TimeMicros, Schema::TimeMicros) => true,
(Schema::TimeMicros, _) => false,
(Schema::TimeMillis, Schema::TimeMillis) => true,
@@ -166,6 +164,8 @@ impl SchemataEq for StructFieldEq {
self.compare(items_one, items_two)
}
(Schema::Array(_), _) => false,
+ (Schema::Duration(FixedSchema { size: size_one, ..}),
Schema::Duration(FixedSchema { size: size_two, ..})) => size_one == size_two,
+ (Schema::Duration(_), _) => false,
(
Schema::Map(MapSchema { types: types_one, ..}),
Schema::Map(MapSchema { types: types_two, ..})
@@ -234,6 +234,9 @@ mod tests {
const STRUCT_FIELD_EQ: StructFieldEq = StructFieldEq {
include_attributes: false,
};
+ const STRUCT_FIELD_EQ_WITH_ATTRS: StructFieldEq = StructFieldEq {
+ include_attributes: true,
+ };
macro_rules! test_primitives {
($primitive:ident) => {
@@ -258,7 +261,6 @@ mod tests {
test_primitives!(String);
test_primitives!(BigDecimal);
test_primitives!(Date);
- test_primitives!(Duration);
test_primitives!(TimeMicros);
test_primitives!(TimeMillis);
test_primitives!(TimestampMicros);
@@ -268,6 +270,110 @@ mod tests {
test_primitives!(LocalTimestampMillis);
test_primitives!(LocalTimestampNanos);
+ #[test]
+ fn avro_rs_382_compare_schemata_duration_equal() {
+ let schema_one = Schema::Duration(FixedSchema {
+ name: Name::from("name1"),
+ size: 12,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+ let schema_two = Schema::Duration(FixedSchema {
+ name: Name::from("name1"),
+ size: 12,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+ let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one,
&schema_two);
+ let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one,
&schema_two);
+ assert_eq!(specification_eq_res, struct_field_eq_res)
+ }
+
+ #[test]
+ fn avro_rs_382_compare_schemata_duration_different_names() {
+ let schema_one = Schema::Duration(FixedSchema {
+ name: Name::from("name1"),
+ size: 12,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+ let schema_two = Schema::Duration(FixedSchema {
+ name: Name::from("name2"),
+ size: 12,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+ let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one,
&schema_two);
+ assert!(!specification_eq_res);
+
+ let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one,
&schema_two);
+ assert!(!struct_field_eq_res)
+ }
+
+ #[test]
+ fn avro_rs_382_compare_schemata_duration_different_attributes() {
+ let schema_one = Schema::Duration(FixedSchema {
+ name: Name::from("name1"),
+ size: 12,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: vec![(String::from("attr1"),
serde_json::Value::Bool(true))]
+ .into_iter()
+ .collect(),
+ });
+ let schema_two = Schema::Duration(FixedSchema {
+ name: Name::from("name1"),
+ size: 12,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+ let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one,
&schema_two);
+ assert!(specification_eq_res);
+
+ let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one,
&schema_two);
+ assert!(struct_field_eq_res);
+
+ let struct_field_eq_with_attrs_res =
+ STRUCT_FIELD_EQ_WITH_ATTRS.compare(&schema_one, &schema_two);
+ assert!(!struct_field_eq_with_attrs_res);
+ }
+
+ #[test]
+ fn avro_rs_382_compare_schemata_duration_different_sizes() {
+ let schema_one = Schema::Duration(FixedSchema {
+ name: Name::from("name1"),
+ size: 8,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+ let schema_two = Schema::Duration(FixedSchema {
+ name: Name::from("name1"),
+ size: 12,
+ aliases: None,
+ doc: None,
+ default: None,
+ attributes: BTreeMap::new(),
+ });
+ let specification_eq_res = SPECIFICATION_EQ.compare(&schema_one,
&schema_two);
+ assert!(!specification_eq_res);
+
+ let struct_field_eq_res = STRUCT_FIELD_EQ.compare(&schema_one,
&schema_two);
+ assert!(!struct_field_eq_res);
+ }
+
#[test]
fn test_avro_3939_compare_named_schemata_with_different_names() {
let schema_one = Schema::Ref {
diff --git a/avro/src/serde/de.rs b/avro/src/serde/de.rs
index c2007b5..ce42576 100644
--- a/avro/src/serde/de.rs
+++ b/avro/src/serde/de.rs
@@ -354,9 +354,13 @@ impl<'de> de::Deserializer<'de> for &Deserializer<'de> {
Value::BigDecimal(ref big_decimal) => {
visitor.visit_str(big_decimal.to_plain_string().as_str())
}
- _ => Err(de::Error::custom(format!(
- "unsupported union: {:?}",
- self.input
+ Value::Duration(ref duration) => {
+ let duration_bytes: [u8; 12] = duration.into();
+ visitor.visit_bytes(&duration_bytes[..])
+ }
+ Value::Union(_, _) => Err(de::Error::custom(format!(
+ "Directly nested union types are not supported. Got {:?}",
+ &**u
))),
},
Value::Record(fields) =>
visitor.visit_map(RecordDeserializer::new(fields)),
@@ -370,10 +374,10 @@ impl<'de> de::Deserializer<'de> for &Deserializer<'de> {
Value::BigDecimal(big_decimal) => {
visitor.visit_str(big_decimal.to_plain_string().as_str())
}
- value => Err(de::Error::custom(format!(
- "incorrect value of type: {:?}",
- crate::schema::SchemaKind::from(value)
- ))),
+ Value::Duration(duration) => {
+ let duration_bytes: [u8; 12] = duration.into();
+ visitor.visit_bytes(&duration_bytes[..])
+ }
}
}
@@ -451,8 +455,12 @@ impl<'de> de::Deserializer<'de> for &Deserializer<'de> {
}
Value::Uuid(ref u) => visitor.visit_bytes(u.as_bytes()),
Value::Decimal(ref d) => visitor.visit_bytes(&d.to_vec()?),
+ Value::Duration(ref d) => {
+ let d_bytes: [u8; 12] = d.into();
+ visitor.visit_bytes(&d_bytes[..])
+ }
_ => Err(de::Error::custom(format!(
- "Expected a String|Bytes|Fixed|Uuid|Decimal, but got {:?}",
+ "Expected a String|Bytes|Fixed|Uuid|Decimal|Duration, but got
{:?}",
self.input
))),
}
@@ -467,8 +475,14 @@ impl<'de> de::Deserializer<'de> for &Deserializer<'de> {
Value::Bytes(ref bytes) | Value::Fixed(_, ref bytes) => {
visitor.visit_byte_buf(bytes.to_owned())
}
+ Value::Uuid(ref u) =>
visitor.visit_byte_buf(Vec::from(u.as_bytes())),
+ Value::Decimal(ref d) => visitor.visit_byte_buf(d.to_vec()?),
+ Value::Duration(ref d) => {
+ let d_bytes: [u8; 12] = d.into();
+ visitor.visit_byte_buf(Vec::from(d_bytes))
+ }
_ => Err(de::Error::custom(format!(
- "Expected a String|Bytes|Fixed, but got {:?}",
+ "Expected a String|Bytes|Fixed|Uuid|Decimal|Duration, but got
{:?}",
self.input
))),
}
diff --git a/avro/src/serde/ser_schema.rs b/avro/src/serde/ser_schema.rs
index 33d5cd5..3909875 100644
--- a/avro/src/serde/ser_schema.rs
+++ b/avro/src/serde/ser_schema.rs
@@ -1143,7 +1143,7 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
)))
}
}
- Schema::Duration => {
+ Schema::Duration(_) => {
if value.len() == 12 {
self.writer
.write(value)
@@ -1193,7 +1193,7 @@ impl<'s, W: Write> SchemaAwareWriteSerializer<'s, W> {
| Schema::Uuid(_)
| Schema::BigDecimal
| Schema::Fixed(_)
- | Schema::Duration
+ | Schema::Duration(_)
| Schema::Decimal(_)
| Schema::Ref { name: _ } => {
encode_int(i as i32, &mut *self.writer)?;
diff --git a/avro/src/types.rs b/avro/src/types.rs
index 8cca5b9..02a5a8e 100644
--- a/avro/src/types.rs
+++ b/avro/src/types.rs
@@ -441,7 +441,7 @@ impl Value {
(&Value::Date(_), &Schema::Date) => None,
(&Value::Decimal(_), &Schema::Decimal { .. }) => None,
(&Value::BigDecimal(_), &Schema::BigDecimal) => None,
- (&Value::Duration(_), &Schema::Duration) => None,
+ (&Value::Duration(_), &Schema::Duration(_)) => None,
(&Value::Uuid(_), &Schema::Uuid(_)) => None,
(&Value::Float(_), &Schema::Float) => None,
(&Value::Float(_), &Schema::Double) => None,
@@ -471,7 +471,7 @@ impl Value {
None
}
}
- (&Value::Fixed(n, _), &Schema::Duration) => {
+ (&Value::Fixed(n, _), &Schema::Duration(_)) => {
if n != 12 {
Some(format!(
"The value's size ('{n}') must be exactly 12 to be a
Duration"
@@ -716,7 +716,7 @@ impl Value {
Schema::LocalTimestampMillis =>
self.resolve_local_timestamp_millis(),
Schema::LocalTimestampMicros =>
self.resolve_local_timestamp_micros(),
Schema::LocalTimestampNanos =>
self.resolve_local_timestamp_nanos(),
- Schema::Duration => self.resolve_duration(),
+ Schema::Duration(_) => self.resolve_duration(),
Schema::Uuid(inner) => self.resolve_uuid(inner),
}
}
@@ -754,7 +754,7 @@ impl Value {
duration @ Value::Duration { .. } => duration,
Value::Fixed(size, bytes) => {
if size != 12 {
- return Err(Details::GetDecimalFixedBytes(size).into());
+ return Err(Details::GetDurationFixedBytes(size).into());
}
Value::Duration(Duration::from([
bytes[0], bytes[1], bytes[2], bytes[3], bytes[4],
bytes[5], bytes[6], bytes[7],
@@ -1332,15 +1332,29 @@ mod tests {
),
(
Value::Fixed(12, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]),
- Schema::Duration,
+ Schema::Duration(FixedSchema {
+ name: Name::from("TestName"),
+ aliases: None,
+ doc: None,
+ size: 12,
+ default: None,
+ attributes: BTreeMap::new(),
+ }),
true,
"",
),
(
Value::Fixed(11, vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]),
- Schema::Duration,
+ Schema::Duration(FixedSchema {
+ name: Name::from("TestName"),
+ aliases: None,
+ doc: None,
+ size: 12,
+ default: None,
+ attributes: BTreeMap::new(),
+ }),
false,
- "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
for schema: Duration. Reason: The value's size ('11') must be exactly 12 to be
a Duration",
+ "Invalid value: Fixed(11, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
for schema: Duration(FixedSchema { name: Name { name: \"TestName\", namespace:
None }, aliases: None, doc: None, size: 12, default: None, attributes: {} }).
Reason: The value's size ('11') must be exactly 12 to be a Duration",
),
(
Value::Record(vec![("unknown_field_name".to_string(),
Value::Null)]),
@@ -1883,9 +1897,32 @@ Field with name '"b"' is not a member of the map items"#,
Days::new(5),
Millis::new(3000),
));
- assert!(value.clone().resolve(&Schema::Duration).is_ok());
+ assert!(
+ value
+ .clone()
+ .resolve(&Schema::Duration(FixedSchema {
+ name: Name::from("TestName"),
+ aliases: None,
+ doc: None,
+ size: 12,
+ default: None,
+ attributes: BTreeMap::new()
+ }))
+ .is_ok()
+ );
assert!(value.resolve(&Schema::TimestampMicros).is_err());
- assert!(Value::Long(1i64).resolve(&Schema::Duration).is_err());
+ assert!(
+ Value::Long(1i64)
+ .resolve(&Schema::Duration(FixedSchema {
+ name: Name::from("TestName"),
+ aliases: None,
+ doc: None,
+ size: 12,
+ default: None,
+ attributes: BTreeMap::new()
+ }))
+ .is_err()
+ );
}
#[test]
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 7fda617..1c375d9 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -1047,7 +1047,14 @@ mod tests {
));
logical_type_test(
r#"{"type": {"type": "fixed", "name": "duration", "size": 12},
"logicalType": "duration"}"#,
- &Schema::Duration,
+ &Schema::Duration(FixedSchema {
+ name: Name::from("duration"),
+ aliases: None,
+ doc: None,
+ size: 12,
+ default: None,
+ attributes: Default::default(),
+ }),
value,
&inner,
Value::Fixed(12, vec![0, 1, 0, 0, 0, 2, 0, 0, 0, 4, 0, 0]),