This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 6218cd7  fix: Ensure SpecificSingleObjectWriter writes header on every 
call (#438)
6218cd7 is described below

commit 6218cd723a02e947c5d799065506a026c92388b5
Author: Felix Obenhuber <[email protected]>
AuthorDate: Mon Jan 26 13:10:03 2026 +0100

    fix: Ensure SpecificSingleObjectWriter writes header on every call (#438)
    
    * fix: Ensure SpecificSingleObjectWriter writes header on every call
    
    Previously, SpecificSingleObjectWriter had a header_written flag that
    caused it to only write the single-object encoding header on the first
    call to write_ref(). Subsequent calls would write only the data, making
    those messages unreadable since they lacked the required header.
    
    According to the Avro single-object encoding specification, each message
    must be independently decodable with its own header (2-byte marker +
    8-byte schema fingerprint). This is essential for use cases like message
    queues where individual messages are stored/transmitted separately.
    
    This fix removes the header_written flag and makes write_ref() always
    write the header, matching GenericSingleObjectWriter's behavior. A new
    test verifies that multiple writes to the same buffer each produce
    independently decodable messages.
    
    * Use write_all instead of write
    
    use `Write::write_all` instead of `write` to ensure _all_ bytes are written.
    
    Co-authored-by: Martin Grigorov <[email protected]>
    
    * fix: Simplify documentation
    
    * test: Verify SpecificSingleObjectWriter writes header on multiple calls
    
    Adds a loop that writes the same object twice to ensure the header is
    correctly written on every call, not just the first one.
    
    * Fix clippy
    
    * Remove trailing whitespace
    
    * Remove trailing whitespace
    
    ---------
    
    Co-authored-by: Felix Obenhuber <[email protected]>
    Co-authored-by: Martin Grigorov <[email protected]>
    Co-authored-by: default <[email protected]>
---
 avro/examples/specific_single_object.rs | 38 ++++++++++++++-----------
 avro/src/writer.rs                      | 49 ++++++++++++++++++++-------------
 2 files changed, 52 insertions(+), 35 deletions(-)

diff --git a/avro/examples/specific_single_object.rs 
b/avro/examples/specific_single_object.rs
index 1ade06f..fa756cf 100644
--- a/avro/examples/specific_single_object.rs
+++ b/avro/examples/specific_single_object.rs
@@ -17,6 +17,7 @@
 
 use apache_avro::{AvroSchema, SpecificSingleObjectReader, 
SpecificSingleObjectWriter};
 use serde::{Deserialize, Serialize};
+use std::iter::repeat_n;
 
 #[derive(Debug, Clone, Serialize, Deserialize, AvroSchema, PartialEq)]
 struct Test {
@@ -32,24 +33,29 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
     };
 
     let mut writer = SpecificSingleObjectWriter::<Test>::with_capacity(1024)?;
-    match writer.write(test.clone(), &mut buffer) {
-        Ok(bytes_written) => {
-            assert_eq!(bytes_written, 15);
-            assert_eq!(
-                buffer,
-                vec![
-                    195, 1, 166, 59, 243, 49, 82, 230, 8, 161, 54, 6, 102, 
111, 111
-                ]
-            );
-        }
-        Err(err) => {
-            panic!("Error during serialization: {err:?}");
+    let reader = SpecificSingleObjectReader::<Test>::new()?;
+
+    for test in repeat_n(test, 2) {
+        buffer.clear();
+
+        match writer.write(test.clone(), &mut buffer) {
+            Ok(bytes_written) => {
+                assert_eq!(bytes_written, 15);
+                assert_eq!(
+                    buffer,
+                    vec![
+                        195, 1, 166, 59, 243, 49, 82, 230, 8, 161, 54, 6, 102, 
111, 111
+                    ]
+                );
+            }
+            Err(err) => {
+                panic!("Error during serialization: {err:?}");
+            }
         }
-    }
 
-    let reader = SpecificSingleObjectReader::<Test>::new()?;
-    let read = reader.read(&mut buffer.as_slice())?;
-    assert_eq!(test, read);
+        let read = reader.read(&mut buffer.as_slice())?;
+        assert_eq!(test, read);
+    }
 
     Ok(())
 }
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 4ec219f..72f87d0 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -596,7 +596,6 @@ where
 {
     inner: GenericSingleObjectWriter,
     schema: Schema,
-    header_written: bool,
     _model: PhantomData<T>,
 }
 
@@ -609,7 +608,6 @@ where
         Ok(SpecificSingleObjectWriter {
             inner: GenericSingleObjectWriter::new_with_capacity(&schema, 
buffer_cap)?,
             schema,
-            header_written: false,
             _model: PhantomData,
         })
     }
@@ -631,25 +629,31 @@ impl<T> SpecificSingleObjectWriter<T>
 where
     T: AvroSchema + Serialize,
 {
-    /// Write the referenced `Serialize` object to the provided `Write` 
object. Returns a result with
-    /// the number of bytes written including the header
+    /// Write the referenced `Serialize` object to the provided `Write` object.
+    ///
+    /// Returns the number of bytes written.
+    ///
+    /// Each call writes a complete single-object encoded message (header + 
data),
+    /// making each message independently decodable.
     pub fn write_ref<W: Write>(&mut self, data: &T, writer: &mut W) -> 
AvroResult<usize> {
-        let mut bytes_written: usize = 0;
-
-        if !self.header_written {
-            bytes_written += writer
-                .write(self.inner.buffer.as_slice())
-                .map_err(Details::WriteBytes)?;
-            self.header_written = true;
-        }
+        // Always write the header for each message (single object encoding 
requires
+        // each message to be independently decodable)
+        writer
+            .write_all(self.inner.buffer.as_slice())
+            .map_err(Details::WriteBytes)?;
 
-        bytes_written += write_avro_datum_ref(&self.schema, data, writer)?;
+        let bytes_written =
+            self.inner.buffer.len() + write_avro_datum_ref(&self.schema, data, 
writer)?;
 
         Ok(bytes_written)
     }
 
-    /// Write the Serialize object to the provided Write object. Returns a 
result with the number
-    /// of bytes written including the header
+    /// Write the Serialize object to the provided Write object.
+    ///
+    /// Returns the number of bytes written.
+    ///
+    /// Each call writes a complete single-object encoded message (header + 
data),
+    /// making each message independently decodable.
     pub fn write<W: Write>(&mut self, data: T, writer: &mut W) -> 
AvroResult<usize> {
         self.write_ref(&data, writer)
     }
@@ -1575,6 +1579,7 @@ mod tests {
         let mut buf1: Vec<u8> = Vec::new();
         let mut buf2: Vec<u8> = Vec::new();
         let mut buf3: Vec<u8> = Vec::new();
+        let mut buf4: Vec<u8> = Vec::new();
 
         let mut generic_writer = GenericSingleObjectWriter::new_with_capacity(
             &TestSingleObjectWriter::get_schema(),
@@ -1585,16 +1590,22 @@ mod tests {
             
SpecificSingleObjectWriter::<TestSingleObjectWriter>::with_capacity(1024)
                 .expect("Resolved should pass");
         specific_writer
-            .write(obj1.clone(), &mut buf1)
+            .write_ref(&obj1, &mut buf1)
             .expect("Serialization expected");
         specific_writer
-            .write_value(obj1.clone(), &mut buf2)
+            .write_ref(&obj1, &mut buf2)
             .expect("Serialization expected");
+        specific_writer
+            .write_value(obj1.clone(), &mut buf3)
+            .expect("Serialization expected");
+
         generic_writer
-            .write_value(obj1.into(), &mut buf3)
+            .write_value(obj1.into(), &mut buf4)
             .expect("Serialization expected");
+
         assert_eq!(buf1, buf2);
-        assert_eq!(buf1, buf3);
+        assert_eq!(buf2, buf3);
+        assert_eq!(buf3, buf4);
 
         Ok(())
     }

Reply via email to