This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f37816  fix: flush Writer on drop (#199)
9f37816 is described below

commit 9f37816e667b63f9dd4a16d9c01cbad3b65505f2
Author: Kriskras99 <[email protected]>
AuthorDate: Fri May 30 23:10:23 2025 +0200

    fix: flush Writer on drop (#199)
    
    * fix: flush Writer on drop
    
    This brings the writer in line with standard library types like `BufWriter` 
and `LineWriter`.
    
    This is a breaking change when the inner writer is `&mut W` and used later 
in the scope without dropping the writer or calling `into_inner` (as seen in 
the fixed tests).
    
    * Drop the Avro `writer` so that its inner `Write` impl can be used again
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    * Update the comments why the writer is dropped manually
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    
    ---------
    
    Signed-off-by: Martin Tzvetanov Grigorov <[email protected]>
    Co-authored-by: kriskras99 <[email protected]>
    Co-authored-by: Martin Tzvetanov Grigorov <[email protected]>
---
 avro/src/writer.rs                        | 29 +++++++++++++++++++++++++++--
 avro/tests/to_from_avro_datum_schemata.rs |  3 ++-
 avro/tests/union_schema.rs                |  1 +
 3 files changed, 30 insertions(+), 3 deletions(-)

diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index a1e87dc..2f3b235 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -25,12 +25,18 @@ use crate::{
     AvroResult, Codec, Error,
 };
 use serde::Serialize;
-use std::{collections::HashMap, io::Write, marker::PhantomData, 
ops::RangeInclusive};
+use std::{
+    collections::HashMap, io::Write, marker::PhantomData, mem::ManuallyDrop, 
ops::RangeInclusive,
+};
 
 const DEFAULT_BLOCK_SIZE: usize = 16000;
 const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
 
 /// Main interface for writing Avro formatted values.
+///
+/// It is critical to call flush before `Writer<W>` is dropped. Though 
dropping will attempt to flush
+/// the contents of the buffer, any errors that happen in the process of 
dropping will be ignored.
+/// Calling flush ensures that the buffer is empty and thus dropping will not 
even attempt file operations.
 #[derive(bon::Builder)]
 pub struct Writer<'a, W: Write> {
     schema: &'a Schema,
@@ -348,7 +354,18 @@ impl<'a, W: Write> Writer<'a, W> {
     pub fn into_inner(mut self) -> AvroResult<W> {
         self.maybe_write_header()?;
         self.flush()?;
-        Ok(self.writer)
+
+        let mut this = ManuallyDrop::new(self);
+
+        // Extract every member that is not Copy and therefore should be 
dropped
+        let _resolved_schema = std::mem::take(&mut this.resolved_schema);
+        let _buffer = std::mem::take(&mut this.buffer);
+        let _user_metadata = std::mem::take(&mut this.user_metadata);
+
+        // SAFETY: double-drops are prevented by putting `this` in a 
ManuallyDrop that is never dropped
+        let writer = unsafe { std::ptr::read(&this.writer) };
+
+        Ok(writer)
     }
 
     /// Gets a reference to the underlying writer.
@@ -459,6 +476,14 @@ impl<'a, W: Write> Writer<'a, W> {
     }
 }
 
+impl<'a, W: Write> Drop for Writer<'a, W> {
+    /// Drop the writer, will try to flush ignoring any errors.
+    fn drop(&mut self) {
+        let _ = self.maybe_write_header();
+        let _ = self.flush();
+    }
+}
+
 /// Encode a compatible value (implementing the `ToAvro` trait) into Avro 
format, also performing
 /// schema validation.
 ///
diff --git a/avro/tests/to_from_avro_datum_schemata.rs 
b/avro/tests/to_from_avro_datum_schemata.rs
index 6f2d595..9986cbb 100644
--- a/avro/tests/to_from_avro_datum_schemata.rs
+++ b/avro/tests/to_from_avro_datum_schemata.rs
@@ -110,9 +110,10 @@ fn test_avro_3683_multiple_schemata_writer_reader() -> 
TestResult {
     let mut writer = Writer::with_schemata(schema_b, schemata.clone(), &mut 
output, Codec::Null);
     writer.append(record.clone())?;
     writer.flush()?;
+    drop(writer); //drop the writer so that `output` is no more referenced 
mutably
 
     let reader = Reader::with_schemata(schema_b, schemata, output.as_slice())?;
-    let value = reader.into_iter().next().unwrap().unwrap();
+    let value = reader.into_iter().next().unwrap()?;
     assert_eq!(value, record);
 
     Ok(())
diff --git a/avro/tests/union_schema.rs b/avro/tests/union_schema.rs
index 106b062..db0f356 100644
--- a/avro/tests/union_schema.rs
+++ b/avro/tests/union_schema.rs
@@ -74,6 +74,7 @@ where
         Writer::with_schemata(schema, schemata.iter().collect(), &mut encoded, 
Codec::Null);
     writer.append_ser(input)?;
     writer.flush()?;
+    drop(writer); //drop the writer so that `encoded` is no more referenced 
mutably
 
     let mut reader = Reader::with_schemata(schema, schemata.iter().collect(), 
encoded.as_slice())?;
     from_value::<T>(&reader.next().expect("")?)

Reply via email to