This is an automated email from the ASF dual-hosted git repository.
mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 6218cd7 fix: Ensure SpecificSingleObjectWriter writes header on every
call (#438)
6218cd7 is described below
commit 6218cd723a02e947c5d799065506a026c92388b5
Author: Felix Obenhuber <[email protected]>
AuthorDate: Mon Jan 26 13:10:03 2026 +0100
fix: Ensure SpecificSingleObjectWriter writes header on every call (#438)
* fix: Ensure SpecificSingleObjectWriter writes header on every call
Previously, SpecificSingleObjectWriter had a header_written flag that
caused it to only write the single-object encoding header on the first
call to write_ref(). Subsequent calls would write only the data, making
those messages unreadable since they lacked the required header.
According to the Avro single-object encoding specification, each message
must be independently decodable with its own header (2-byte marker +
8-byte schema fingerprint). This is essential for use cases like message
queues where individual messages are stored/transmitted separately.
This fix removes the header_written flag and makes write_ref() always
write the header, matching GenericSingleObjectWriter's behavior. A new
test verifies that multiple writes to the same buffer each produce
independently decodable messages.
* Use write_all instead of write
use `Write::write_all` instead of `write` to ensure _all_ bytes are written.
Co-authored-by: Martin Grigorov <[email protected]>
* fix: Simplify documentation
* test: Verify SpecificSingleObjectWriter writes header on multiple calls
Adds a loop that writes the same object twice to ensure the header is
correctly written on every call, not just the first one.
* Fix clippy
* Remove trailing whitespace
* Remove trailing whitespace
---------
Co-authored-by: Felix Obenhuber <[email protected]>
Co-authored-by: Martin Grigorov <[email protected]>
Co-authored-by: default <[email protected]>
---
avro/examples/specific_single_object.rs | 38 ++++++++++++++-----------
avro/src/writer.rs | 49 ++++++++++++++++++++-------------
2 files changed, 52 insertions(+), 35 deletions(-)
diff --git a/avro/examples/specific_single_object.rs
b/avro/examples/specific_single_object.rs
index 1ade06f..fa756cf 100644
--- a/avro/examples/specific_single_object.rs
+++ b/avro/examples/specific_single_object.rs
@@ -17,6 +17,7 @@
use apache_avro::{AvroSchema, SpecificSingleObjectReader,
SpecificSingleObjectWriter};
use serde::{Deserialize, Serialize};
+use std::iter::repeat_n;
#[derive(Debug, Clone, Serialize, Deserialize, AvroSchema, PartialEq)]
struct Test {
@@ -32,24 +33,29 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
};
let mut writer = SpecificSingleObjectWriter::<Test>::with_capacity(1024)?;
- match writer.write(test.clone(), &mut buffer) {
- Ok(bytes_written) => {
- assert_eq!(bytes_written, 15);
- assert_eq!(
- buffer,
- vec![
- 195, 1, 166, 59, 243, 49, 82, 230, 8, 161, 54, 6, 102,
111, 111
- ]
- );
- }
- Err(err) => {
- panic!("Error during serialization: {err:?}");
+ let reader = SpecificSingleObjectReader::<Test>::new()?;
+
+ for test in repeat_n(test, 2) {
+ buffer.clear();
+
+ match writer.write(test.clone(), &mut buffer) {
+ Ok(bytes_written) => {
+ assert_eq!(bytes_written, 15);
+ assert_eq!(
+ buffer,
+ vec![
+ 195, 1, 166, 59, 243, 49, 82, 230, 8, 161, 54, 6, 102,
111, 111
+ ]
+ );
+ }
+ Err(err) => {
+ panic!("Error during serialization: {err:?}");
+ }
}
- }
- let reader = SpecificSingleObjectReader::<Test>::new()?;
- let read = reader.read(&mut buffer.as_slice())?;
- assert_eq!(test, read);
+ let read = reader.read(&mut buffer.as_slice())?;
+ assert_eq!(test, read);
+ }
Ok(())
}
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 4ec219f..72f87d0 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -596,7 +596,6 @@ where
{
inner: GenericSingleObjectWriter,
schema: Schema,
- header_written: bool,
_model: PhantomData<T>,
}
@@ -609,7 +608,6 @@ where
Ok(SpecificSingleObjectWriter {
inner: GenericSingleObjectWriter::new_with_capacity(&schema,
buffer_cap)?,
schema,
- header_written: false,
_model: PhantomData,
})
}
@@ -631,25 +629,31 @@ impl<T> SpecificSingleObjectWriter<T>
where
T: AvroSchema + Serialize,
{
- /// Write the referenced `Serialize` object to the provided `Write`
object. Returns a result with
- /// the number of bytes written including the header
+ /// Write the referenced `Serialize` object to the provided `Write` object.
+ ///
+ /// Returns the number of bytes written.
+ ///
+ /// Each call writes a complete single-object encoded message (header +
data),
+ /// making each message independently decodable.
pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) ->
AvroResult<usize> {
- let mut bytes_written: usize = 0;
-
- if !self.header_written {
- bytes_written += writer
- .write(self.inner.buffer.as_slice())
- .map_err(Details::WriteBytes)?;
- self.header_written = true;
- }
+ // Always write the header for each message (single object encoding
requires
+ // each message to be independently decodable)
+ writer
+ .write_all(self.inner.buffer.as_slice())
+ .map_err(Details::WriteBytes)?;
- bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
+ let bytes_written =
+ self.inner.buffer.len() + write_avro_datum_ref(&self.schema, data,
writer)?;
Ok(bytes_written)
}
- /// Write the Serialize object to the provided Write object. Returns a
result with the number
- /// of bytes written including the header
+ /// Write the Serialize object to the provided Write object.
+ ///
+ /// Returns the number of bytes written.
+ ///
+ /// Each call writes a complete single-object encoded message (header +
data),
+ /// making each message independently decodable.
pub fn write<W: Write>(&mut self, data: T, writer: &mut W) ->
AvroResult<usize> {
self.write_ref(&data, writer)
}
@@ -1575,6 +1579,7 @@ mod tests {
let mut buf1: Vec<u8> = Vec::new();
let mut buf2: Vec<u8> = Vec::new();
let mut buf3: Vec<u8> = Vec::new();
+ let mut buf4: Vec<u8> = Vec::new();
let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
&TestSingleObjectWriter::get_schema(),
@@ -1585,16 +1590,22 @@ mod tests {
SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
.expect("Resolved should pass");
specific_writer
- .write(obj1.clone(), &mut buf1)
+ .write_ref(&obj1, &mut buf1)
.expect("Serialization expected");
specific_writer
- .write_value(obj1.clone(), &mut buf2)
+ .write_ref(&obj1, &mut buf2)
.expect("Serialization expected");
+ specific_writer
+ .write_value(obj1.clone(), &mut buf3)
+ .expect("Serialization expected");
+
generic_writer
- .write_value(obj1.into(), &mut buf3)
+ .write_value(obj1.into(), &mut buf4)
.expect("Serialization expected");
+
assert_eq!(buf1, buf2);
- assert_eq!(buf1, buf3);
+ assert_eq!(buf2, buf3);
+ assert_eq!(buf3, buf4);
Ok(())
}