This is an automated email from the ASF dual-hosted git repository.

mgrigorov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0868a8b  AVRO-4063: Call `flush` on the inner `writer` during 
`Writer::flush` (#14)
0868a8b is described below

commit 0868a8b2d6aa90070bcb6a3388555746d3e44546
Author: Jane Lewis <[email protected]>
AuthorDate: Wed Oct 2 02:35:26 2024 -0700

    AVRO-4063: Call `flush` on the inner `writer` during `Writer::flush` (#14)
---
 avro/src/error.rs  |  3 +++
 avro/src/writer.rs | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 61 insertions(+)

diff --git a/avro/src/error.rs b/avro/src/error.rs
index d92daa4..09d458c 100644
--- a/avro/src/error.rs
+++ b/avro/src/error.rs
@@ -446,6 +446,9 @@ pub enum Error {
     #[error("Failed to write buffer bytes during flush: {0}")]
     WriteBytes(#[source] std::io::Error),
 
+    #[error("Failed to flush inner writer during flush: {0}")]
+    FlushWriter(#[source] std::io::Error),
+
     #[error("Failed to write marker: {0}")]
     WriteMarker(#[source] std::io::Error),
 
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 5010bff..f1fdae4 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -315,6 +315,8 @@ impl<'a, W: Write> Writer<'a, W> {
         self.buffer.clear();
         self.num_values = 0;
 
+        self.writer.flush().map_err(Error::FlushWriter)?;
+
         Ok(num_bytes)
     }
 
@@ -657,6 +659,8 @@ fn generate_sync_marker() -> [u8; 16] {
 
 #[cfg(test)]
 mod tests {
+    use std::{cell::RefCell, rc::Rc};
+
     use super::*;
     use crate::{
         decimal::Decimal,
@@ -1444,4 +1448,58 @@ mod tests {
         }
         Ok(())
     }
+
+    #[test]
+    fn avro_4063_flush_applies_to_inner_writer() -> TestResult {
+        const SCHEMA: &str = r#"
+        {
+            "type": "record",
+            "name": "ExampleSchema",
+            "fields": [
+                {"name": "exampleField", "type": "string"}
+            ]
+        }
+        "#;
+
+        #[derive(Clone, Default)]
+        struct TestBuffer(Rc<RefCell<Vec<u8>>>);
+
+        impl TestBuffer {
+            fn len(&self) -> usize {
+                self.0.borrow().len()
+            }
+        }
+
+        impl Write for TestBuffer {
+            fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
+                self.0.borrow_mut().write(buf)
+            }
+
+            fn flush(&mut self) -> std::io::Result<()> {
+                Ok(())
+            }
+        }
+
+        let shared_buffer = TestBuffer::default();
+
+        let buffered_writer = std::io::BufWriter::new(shared_buffer.clone());
+
+        let schema = Schema::parse_str(SCHEMA)?;
+
+        let mut writer = Writer::new(&schema, buffered_writer);
+
+        let mut record = Record::new(writer.schema()).unwrap();
+        record.put("exampleField", "value");
+
+        writer.append(record)?;
+        writer.flush()?;
+
+        assert_eq!(
+            shared_buffer.len(),
+            167,
+            "the test buffer was not fully written to after Writer::flush was 
called"
+        );
+
+        Ok(())
+    }
 }

Reply via email to