This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 597db4f  fix: Also (maybe) write header on `Writer::flush`. (#222)
597db4f is described below

commit 597db4f434a2f1d32eaf7f7a01bd23e24a523683
Author: Kriskras99 <[email protected]>
AuthorDate: Wed Jul 2 09:33:10 2025 +0200

    fix: Also (maybe) write header on `Writer::flush`. (#222)
---
 avro/src/writer.rs | 52 +++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 41 insertions(+), 11 deletions(-)

diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 413e60a..e043425 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -53,6 +53,9 @@ pub struct Writer<'a, W: Write> {
     num_values: usize,
     #[builder(default = generate_sync_marker())]
     marker: [u8; 16],
+    /// Has the header already been written.
+    ///
+    /// To disable writing the header, this can be set to `true`.
     #[builder(default = false)]
     has_header: bool,
     #[builder(default)]
@@ -152,7 +155,7 @@ impl<'a, W: Write> Writer<'a, W> {
     /// Append a compatible value (implementing the `ToAvro` trait) to a 
`Writer`, also performing
     /// schema validation.
     ///
-    /// Return the number of bytes written (it might be 0, see below).
+    /// Returns the number of bytes written (it might be 0, see below).
     ///
     /// **NOTE**: This function is not guaranteed to perform any actual write, 
since it relies on
     /// internal buffering for performance reasons. If you want to be sure the 
value has been
@@ -166,7 +169,7 @@ impl<'a, W: Write> Writer<'a, W> {
 
     /// Append a compatible value to a `Writer`, also performing schema 
validation.
     ///
-    /// Return the number of bytes written (it might be 0, see below).
+    /// Returns the number of bytes written (it might be 0, see below).
     ///
     /// **NOTE**: This function is not guaranteed to perform any actual write, 
since it relies on
     /// internal buffering for performance reasons. If you want to be sure the 
value has been
@@ -198,7 +201,7 @@ impl<'a, W: Write> Writer<'a, W> {
     /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also 
performing schema
     /// validation.
     ///
-    /// Return the number of bytes written.
+    /// Returns the number of bytes written.
     ///
     /// **NOTE**: This function is not guaranteed to perform any actual write, 
since it relies on
     /// internal buffering for performance reasons. If you want to be sure the 
value has been
@@ -234,7 +237,7 @@ impl<'a, W: Write> Writer<'a, W> {
     /// Extend a `Writer` with an `Iterator` of compatible values 
(implementing the `ToAvro`
     /// trait), also performing schema validation.
     ///
-    /// Return the number of bytes written.
+    /// Returns the number of bytes written.
     ///
     /// **NOTE**: This function forces the written data to be flushed (an 
implicit
     /// call to [`flush`](Writer::flush) is performed).
@@ -269,7 +272,7 @@ impl<'a, W: Write> Writer<'a, W> {
     /// [`serde`](https://docs.serde.rs/serde/index.html) compatibility, also 
performing schema
     /// validation.
     ///
-    /// Return the number of bytes written.
+    /// Returns the number of bytes written.
     ///
     /// **NOTE**: This function forces the written data to be flushed (an 
implicit
     /// call to [`flush`](Writer::flush) is performed).
@@ -303,7 +306,7 @@ impl<'a, W: Write> Writer<'a, W> {
     /// Extend a `Writer` by appending each `Value` from a slice, while also 
performing schema
     /// validation on each value appended.
     ///
-    /// Return the number of bytes written.
+    /// Returns the number of bytes written.
     ///
     /// **NOTE**: This function forces the written data to be flushed (an 
implicit
     /// call to [`flush`](Writer::flush) is performed).
@@ -317,13 +320,17 @@ impl<'a, W: Write> Writer<'a, W> {
         Ok(num_bytes)
     }
 
-    /// Flush the content appended to a `Writer`. Call this function to make 
sure all the content
-    /// has been written before releasing the `Writer`.
+    /// Flush the content to the inner `Writer`.
+    ///
+    /// Call this function to make sure all the content has been written 
before releasing the `Writer`.
+    /// This will also write the header if it wasn't written yet and hasn't 
been disabled using
+    /// [`WriterBuilder::has_header`].
     ///
-    /// Return the number of bytes written.
+    /// Returns the number of bytes written.
     pub fn flush(&mut self) -> AvroResult<usize> {
+        let mut num_bytes = self.maybe_write_header()?;
         if self.num_values == 0 {
-            return Ok(0);
+            return Ok(num_bytes);
         }
 
         self.codec.compress(&mut self.buffer)?;
@@ -331,7 +338,7 @@ impl<'a, W: Write> Writer<'a, W> {
         let num_values = self.num_values;
         let stream_len = self.buffer.len();
 
-        let num_bytes = self.append_raw(&num_values.into(), &Schema::Long)?
+        num_bytes += self.append_raw(&num_values.into(), &Schema::Long)?
             + self.append_raw(&stream_len.into(), &Schema::Long)?
             + self
                 .writer
@@ -840,6 +847,29 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn avro_rs_220_flush_write_header() -> TestResult {
+        let schema = Schema::parse_str(SCHEMA)?;
+
+        // By default flush should write the header even if nothing was added 
yet
+        let mut writer = Writer::new(&schema, Vec::new());
+        writer.flush()?;
+        let result = writer.into_inner()?;
+        assert_eq!(result.len(), 163);
+
+        // Unless the user indicates via the builder that the header has 
already been written
+        let mut writer = Writer::builder()
+            .has_header(true)
+            .schema(&schema)
+            .writer(Vec::new())
+            .build();
+        writer.flush()?;
+        let result = writer.into_inner()?;
+        assert_eq!(result.len(), 0);
+
+        Ok(())
+    }
+
     #[test]
     fn test_union_not_null() -> TestResult {
         let schema = Schema::parse_str(UNION_SCHEMA)?;

Reply via email to