This is an automated email from the ASF dual-hosted git repository.
kriskras99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b7fa60d fix: Don't depend on Serde to provide fields in the right
order (#351)
b7fa60d is described below
commit b7fa60df5988366b86b4ff43172d731ecac3aa90
Author: Kriskras99 <[email protected]>
AuthorDate: Mon Dec 8 13:10:44 2025 +0100
fix: Don't depend on Serde to provide fields in the right order (#351)
* fix: Different field order between Serde and the Schema
* fix: Downgrade assert to debug_assert
---------
Co-authored-by: default <[email protected]>
---
avro/src/error.rs | 5 +-
avro/src/ser_schema.rs | 192 ++++++++++++++++++++++++++++++++++++++++------
avro/tests/avro-rs-226.rs | 73 +++++++++++++++++-
3 files changed, 243 insertions(+), 27 deletions(-)
diff --git a/avro/src/error.rs b/avro/src/error.rs
index 95aeb2b..a3e2cf0 100644
--- a/avro/src/error.rs
+++ b/avro/src/error.rs
@@ -516,11 +516,14 @@ pub enum Details {
#[error("Failed to serialize field '{field_name}' for record
{record_schema:?}: {error}")]
SerializeRecordFieldWithSchema {
- field_name: &'static str,
+ field_name: String,
record_schema: Schema,
error: Box<Error>,
},
+ #[error("Missing default for skipped field '{field_name}' for schema
{schema:?}")]
+ MissingDefaultForSkippedField { field_name: String, schema: Schema },
+
#[error("Failed to deserialize Avro value into value: {0}")]
DeserializeValue(String),
diff --git a/avro/src/ser_schema.rs b/avro/src/ser_schema.rs
index 5a91eb6..f9ee2fc 100644
--- a/avro/src/ser_schema.rs
+++ b/avro/src/ser_schema.rs
@@ -25,8 +25,8 @@ use crate::{
schema::{Name, NamesRef, Namespace, RecordField, RecordSchema, Schema},
};
use bigdecimal::BigDecimal;
-use serde::{Serialize, ser};
-use std::{borrow::Cow, io::Write, str::FromStr};
+use serde::ser;
+use std::{borrow::Cow, cmp::Ordering, collections::HashMap, io::Write,
str::FromStr};
const COLLECTION_SERIALIZER_ITEM_LIMIT: usize = 1024;
const COLLECTION_SERIALIZER_DEFAULT_INIT_ITEM_CAPACITY: usize = 32;
@@ -249,6 +249,9 @@ impl<W: Write> ser::SerializeMap for
SchemaAwareWriteSerializeMap<'_, '_, W> {
pub struct SchemaAwareWriteSerializeStruct<'a, 's, W: Write> {
ser: &'a mut SchemaAwareWriteSerializer<'s, W>,
record_schema: &'s RecordSchema,
+ /// Fields we received in the wrong order
+ field_cache: HashMap<usize, Vec<u8>>,
+ field_position: usize,
bytes_written: usize,
}
@@ -260,6 +263,8 @@ impl<'a, 's, W: Write> SchemaAwareWriteSerializeStruct<'a,
's, W> {
SchemaAwareWriteSerializeStruct {
ser,
record_schema,
+ field_cache: HashMap::new(),
+ field_position: 0,
bytes_written: 0,
}
}
@@ -268,19 +273,85 @@ impl<'a, 's, W: Write>
SchemaAwareWriteSerializeStruct<'a, 's, W> {
where
T: ?Sized + ser::Serialize,
{
- // If we receive fields in order, write them directly to the main
writer
- let mut value_ser = SchemaAwareWriteSerializer::new(
- &mut *self.ser.writer,
- &field.schema,
- self.ser.names,
- self.ser.enclosing_namespace.clone(),
- );
- self.bytes_written += value.serialize(&mut value_ser)?;
-
- Ok(())
+ match self.field_position.cmp(&field.position) {
+ Ordering::Equal => {
+ // If we receive fields in order, write them directly to the
main writer
+ let mut value_ser = SchemaAwareWriteSerializer::new(
+ &mut *self.ser.writer,
+ &field.schema,
+ self.ser.names,
+ self.ser.enclosing_namespace.clone(),
+ );
+ self.bytes_written += value.serialize(&mut value_ser)?;
+
+ self.field_position += 1;
+ while let Some(bytes) =
self.field_cache.remove(&self.field_position) {
+ self.ser
+ .writer
+ .write_all(&bytes)
+ .map_err(Details::WriteBytes)?;
+ self.bytes_written += bytes.len();
+ self.field_position += 1;
+ }
+ Ok(())
+ }
+ Ordering::Less => {
+ // Current field position is smaller than this field position,
+ // so we're still missing at least one field, save this field
temporarily
+ let mut bytes = Vec::new();
+ let mut value_ser = SchemaAwareWriteSerializer::new(
+ &mut bytes,
+ &field.schema,
+ self.ser.names,
+ self.ser.enclosing_namespace.clone(),
+ );
+ value.serialize(&mut value_ser)?;
+ if self.field_cache.insert(field.position, bytes).is_some() {
+ Err(Details::FieldNameDuplicate(field.name.clone()).into())
+ } else {
+ Ok(())
+ }
+ }
+ Ordering::Greater => {
+ // Current field position is greater than this field position,
+ // so we've already had this field
+ Err(Details::FieldNameDuplicate(field.name.clone()).into())
+ }
+ }
}
- fn end(self) -> Result<usize, Error> {
+ fn end(mut self) -> Result<usize, Error> {
+ // Write any fields that are `serde(skip)` or `serde(skip_serializing)`
+ while self.field_position != self.record_schema.fields.len() {
+ let field_info = &self.record_schema.fields[self.field_position];
+ if let Some(bytes) = self.field_cache.remove(&self.field_position)
{
+ self.ser
+ .writer
+ .write_all(&bytes)
+ .map_err(Details::WriteBytes)?;
+ self.bytes_written += bytes.len();
+ self.field_position += 1;
+ } else if let Some(default) = &field_info.default {
+ self.serialize_next_field(field_info, default)
+ .map_err(|e| Details::SerializeRecordFieldWithSchema {
+ field_name: field_info.name.clone(),
+ record_schema:
Schema::Record(self.record_schema.clone()),
+ error: Box::new(e),
+ })?;
+ } else {
+ return Err(Details::MissingDefaultForSkippedField {
+ field_name: field_info.name.clone(),
+ schema: Schema::Record(self.record_schema.clone()),
+ }
+ .into());
+ }
+ }
+
+ debug_assert!(
+ self.field_cache.is_empty(),
+ "There should be no more unwritten fields at this point: {:?}",
+ self.field_cache
+ );
Ok(self.bytes_written)
}
}
@@ -304,7 +375,7 @@ impl<W: Write> ser::SerializeStruct for
SchemaAwareWriteSerializeStruct<'_, '_,
// self.item_count += 1;
self.serialize_next_field(field, value).map_err(|e| {
Details::SerializeRecordFieldWithSchema {
- field_name: key,
+ field_name: key.to_string(),
record_schema:
Schema::Record(self.record_schema.clone()),
error: Box::new(e),
}
@@ -323,15 +394,20 @@ impl<W: Write> ser::SerializeStruct for
SchemaAwareWriteSerializeStruct<'_, '_,
.and_then(|idx| self.record_schema.fields.get(*idx));
if let Some(skipped_field) = skipped_field {
- // self.item_count += 1;
- skipped_field
- .default
- .serialize(&mut SchemaAwareWriteSerializer::new(
- self.ser.writer,
- &skipped_field.schema,
- self.ser.names,
- self.ser.enclosing_namespace.clone(),
- ))?;
+ if let Some(default) = &skipped_field.default {
+ self.serialize_next_field(skipped_field, default)
+ .map_err(|e| Details::SerializeRecordFieldWithSchema {
+ field_name: key.to_string(),
+ record_schema:
Schema::Record(self.record_schema.clone()),
+ error: Box::new(e),
+ })?;
+ } else {
+ return Err(Details::MissingDefaultForSkippedField {
+ field_name: key.to_string(),
+ schema: Schema::Record(self.record_schema.clone()),
+ }
+ .into());
+ }
} else {
return Err(Details::GetField(key.to_string()).into());
}
@@ -1741,12 +1817,13 @@ impl<'a, 's, W: Write> ser::Serializer for &'a mut
SchemaAwareWriteSerializer<'s
mod tests {
use super::*;
use crate::{
- Days, Duration, Millis, Months, decimal::Decimal, error::Details,
schema::ResolvedSchema,
+ Days, Duration, Millis, Months, Reader, Writer, decimal::Decimal,
error::Details,
+ from_value, schema::ResolvedSchema,
};
use apache_avro_test_helper::TestResult;
use bigdecimal::BigDecimal;
use num_bigint::{BigInt, Sign};
- use serde::Serialize;
+ use serde::{Deserialize, Serialize};
use serde_bytes::{ByteArray, Bytes};
use std::{
collections::{BTreeMap, HashMap},
@@ -2900,4 +2977,69 @@ mod tests {
string_record.serialize(&mut serializer)?;
Ok(())
}
+
+ #[test]
+ fn avro_rs_351_different_field_order_serde_vs_schema() -> TestResult {
+ #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+ struct Foo {
+ a: String,
+ b: String,
+ c: usize,
+ d: f64,
+ e: usize,
+ }
+ let schema = Schema::parse_str(
+ r#"
+ {
+ "type":"record",
+ "name":"Foo",
+ "fields": [
+ {
+ "name":"b",
+ "type":"string"
+ },
+ {
+ "name":"a",
+ "type":"string"
+ },
+ {
+ "name":"d",
+ "type":"double"
+ },
+ {
+ "name":"e",
+ "type":"long"
+ },
+ {
+ "name":"c",
+ "type":"long"
+ }
+ ]
+ }
+ "#,
+ )?;
+
+ let mut writer = Writer::new(&schema, Vec::new())?;
+ writer.append_ser(Foo {
+ a: "Hello".into(),
+ b: "World".into(),
+ c: 42,
+ d: std::f64::consts::PI,
+ e: 5,
+ })?;
+ let encoded = writer.into_inner()?;
+ let mut reader = Reader::with_schema(&schema, &encoded[..])?;
+ let decoded = from_value::<Foo>(&reader.next().unwrap()?)?;
+ assert_eq!(
+ decoded,
+ Foo {
+ a: "Hello".into(),
+ b: "World".into(),
+ c: 42,
+ d: std::f64::consts::PI,
+ e: 5
+ }
+ );
+ Ok(())
+ }
}
diff --git a/avro/tests/avro-rs-226.rs b/avro/tests/avro-rs-226.rs
index 385b42e..15f060b 100644
--- a/avro/tests/avro-rs-226.rs
+++ b/avro/tests/avro-rs-226.rs
@@ -45,6 +45,7 @@ fn
avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_middle_field
struct T {
x: Option<i8>,
#[serde(skip_serializing_if = "Option::is_none")]
+ #[avro(default = "null")]
y: Option<String>,
z: Option<i8>,
}
@@ -64,6 +65,7 @@ fn
avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_first_field(
#[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
struct T {
#[serde(skip_serializing_if = "Option::is_none")]
+ #[avro(default = "null")]
x: Option<i8>,
y: Option<String>,
z: Option<i8>,
@@ -86,6 +88,7 @@ fn
avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_last_field()
x: Option<i8>,
y: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
+ #[avro(default = "null")]
z: Option<i8>,
}
@@ -100,18 +103,20 @@ fn
avro_rs_226_index_out_of_bounds_with_serde_skip_serializing_skip_last_field()
}
#[test]
-#[ignore = "This test should be re-enabled once the serde-driven
deserialization is implemented! PR #227"]
fn avro_rs_226_index_out_of_bounds_with_serde_skip_multiple_fields() ->
TestResult {
#[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
struct T {
no_skip1: Option<i8>,
#[serde(skip_serializing)]
+ #[avro(default = "null")]
skip_serializing: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
+ #[avro(default = "null")]
skip_serializing_if: Option<i8>,
#[serde(skip_deserializing)]
skip_deserializing: Option<String>,
#[serde(skip)]
+ #[avro(skip)]
skip: Option<String>,
no_skip2: Option<i8>,
}
@@ -128,3 +133,69 @@ fn
avro_rs_226_index_out_of_bounds_with_serde_skip_multiple_fields() -> TestResu
},
)
}
+
+#[test]
+#[should_panic(expected = "Missing default for skipped field 'y' for schema")]
+fn avro_rs_351_no_default_for_serde_skip_serializing_if_should_panic() {
+ #[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
+ struct T {
+ x: Option<i8>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ y: Option<String>,
+ z: Option<i8>,
+ }
+
+ ser_deser::<T>(
+ &T::get_schema(),
+ T {
+ x: None,
+ y: None,
+ z: Some(1),
+ },
+ )
+ .unwrap()
+}
+
+#[test]
+#[should_panic(expected = "Missing default for skipped field 'x' for schema")]
+fn avro_rs_351_no_default_for_serde_skip_should_panic() {
+ #[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
+ struct T {
+ #[serde(skip)]
+ x: Option<i8>,
+ y: Option<String>,
+ z: Option<i8>,
+ }
+
+ ser_deser::<T>(
+ &T::get_schema(),
+ T {
+ x: None,
+ y: None,
+ z: Some(1),
+ },
+ )
+ .unwrap()
+}
+
+#[test]
+#[should_panic(expected = "Missing default for skipped field 'z' for schema")]
+fn avro_rs_351_no_default_for_serde_skip_serializing_should_panic() {
+ #[derive(AvroSchema, Clone, Debug, Deserialize, PartialEq, Serialize)]
+ struct T {
+ x: Option<i8>,
+ y: Option<String>,
+ #[serde(skip_serializing)]
+ z: Option<i8>,
+ }
+
+ ser_deser::<T>(
+ &T::get_schema(),
+ T {
+ x: Some(0),
+ y: None,
+ z: None,
+ },
+ )
+ .unwrap()
+}