This is an automated email from the ASF dual-hosted git repository.
kriskras99 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/avro-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0683bae feat!: resolve schema(ta) in the builder of `Writer` (#328)
0683bae is described below
commit 0683bae14e201ae71511069b352d6c2662b62445
Author: Kriskras99 <[email protected]>
AuthorDate: Thu Oct 30 22:30:09 2025 +0100
feat!: resolve schema(ta) in the builder of `Writer` (#328)
This moves the resolving error to the place where it should happen
and simplifies `Writer` functions.
This is a breaking change, as the builder now returns a `AvroResult`.
The actual relevant code is in the first 400 lines of `avro/src/writer.rs`.
All other changes are adding `?` or `.unwrap()` to test code.
---
avro/README.md | 10 +-
avro/benches/serde.rs | 4 +-
avro/examples/benchmark.rs | 2 +-
avro/examples/generate_interop_data.rs | 2 +-
avro/src/bigdecimal.rs | 2 +-
avro/src/lib.rs | 26 ++--
avro/src/reader.rs | 2 +-
avro/src/schema.rs | 2 +-
avro/src/schema_compatibility.rs | 4 +-
avro/src/writer.rs | 185 ++++++++++++------------
avro/tests/append_to_existing.rs | 11 +-
avro/tests/avro-rs-226.rs | 2 +-
avro/tests/avro-rs-285-bytes_deserialization.rs | 4 +-
avro/tests/codecs.rs | 2 +-
avro/tests/schema.rs | 28 ++--
avro/tests/shared.rs | 2 +-
avro/tests/to_from_avro_datum_schemata.rs | 2 +-
avro/tests/union_schema.rs | 2 +-
avro_derive/README.md | 2 +-
avro_derive/tests/derive.rs | 2 +-
wasm-demo/tests/demos.rs | 3 +-
21 files changed, 152 insertions(+), 147 deletions(-)
diff --git a/avro/README.md b/avro/README.md
index 57e7194..d11cbba 100644
--- a/avro/README.md
+++ b/avro/README.md
@@ -206,7 +206,7 @@ associated type provided by the library to specify the data
we want to serialize
use apache_avro::types::Record;
use apache_avro::Writer;
// a writer needs a schema and something to write to
-let mut writer = Writer::new(&schema, Vec::new());
+let mut writer = Writer::new(&schema, Vec::new()).unwrap();
// the Record type models our Record schema
let mut record = Record::new(writer.schema()).unwrap();
@@ -248,7 +248,7 @@ struct Test {
}
// a writer needs a schema and something to write to
-let mut writer = Writer::new(&schema, Vec::new());
+let mut writer = Writer::new(&schema, Vec::new()).unwrap();
// the structure models our Record schema
let test = Test {
@@ -421,7 +421,7 @@ fn main() -> Result<(), Error> {
println!("{:?}", schema);
- let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default()));
+ let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default())).unwrap();
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
@@ -547,7 +547,7 @@ fn main() -> Result<(), Error> {
println!("{:?}", schema);
- let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default()));
+ let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default())).unwrap() ;
let mut record = Record::new(writer.schema()).unwrap();
record.put("decimal_fixed",
Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
@@ -809,7 +809,7 @@ fn serde_byte_array() {
];
// Serialize records to Avro binary format with the schema
- let mut writer = apache_avro::Writer::new(&schema, Vec::new());
+ let mut writer = apache_avro::Writer::new(&schema, Vec::new()).unwrap();
for record in &records {
writer.append_ser(record).unwrap();
}
diff --git a/avro/benches/serde.rs b/avro/benches/serde.rs
index 36484c3..471c082 100644
--- a/avro/benches/serde.rs
+++ b/avro/benches/serde.rs
@@ -239,13 +239,13 @@ fn make_records_ser<T: Serialize + Clone>(record: T,
count: usize) -> Vec<T> {
}
fn write(schema: &Schema, records: &[Value]) -> AvroResult<Vec<u8>> {
- let mut writer = Writer::new(schema, Vec::new());
+ let mut writer = Writer::new(schema, Vec::new())?;
writer.extend_from_slice(records).unwrap();
writer.into_inner()
}
fn write_ser<T: Serialize>(schema: &Schema, records: &[T]) ->
AvroResult<Vec<u8>> {
- let mut writer = Writer::new(schema, Vec::new());
+ let mut writer = Writer::new(schema, Vec::new())?;
writer.extend_ser(records)?;
writer.into_inner()
}
diff --git a/avro/examples/benchmark.rs b/avro/examples/benchmark.rs
index b604ab4..7d5922b 100644
--- a/avro/examples/benchmark.rs
+++ b/avro/examples/benchmark.rs
@@ -59,7 +59,7 @@ fn benchmark(
let records = records.clone();
let start = Instant::now();
- let mut writer = Writer::new(schema, BufWriter::new(Vec::new()));
+ let mut writer = Writer::new(schema, BufWriter::new(Vec::new()))?;
writer.extend(records)?;
let duration = Instant::now().duration_since(start);
diff --git a/avro/examples/generate_interop_data.rs
b/avro/examples/generate_interop_data.rs
index 7b3c369..47cb8ea 100644
--- a/avro/examples/generate_interop_data.rs
+++ b/avro/examples/generate_interop_data.rs
@@ -97,7 +97,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let file_name = format!("{}/rust{suffix}.avro", &data_folder);
let output_file = std::fs::File::create(&file_name)?;
- let mut writer = Writer::with_codec(&schema,
BufWriter::new(output_file), codec);
+ let mut writer = Writer::with_codec(&schema,
BufWriter::new(output_file), codec)?;
write_user_metadata(&mut writer)?;
let datum = create_datum(&schema);
diff --git a/avro/src/bigdecimal.rs b/avro/src/bigdecimal.rs
index 0f4e164..6022f0d 100644
--- a/avro/src/bigdecimal.rs
+++ b/avro/src/bigdecimal.rs
@@ -156,7 +156,7 @@ mod tests {
.schema(&schema)
.codec(codec)
.writer(Vec::new())
- .build();
+ .build()?;
writer.append(record.clone())?;
writer.flush()?;
diff --git a/avro/src/lib.rs b/avro/src/lib.rs
index 56eb941..95dffa7 100644
--- a/avro/src/lib.rs
+++ b/avro/src/lib.rs
@@ -208,7 +208,7 @@
//! # "#;
//! # let schema = Schema::parse_str(raw_schema).unwrap();
//! // a writer needs a schema and something to write to
-//! let mut writer = Writer::new(&schema, Vec::new());
+//! let mut writer = Writer::new(&schema, Vec::new()).unwrap();
//!
//! // the Record type models our Record schema
//! let mut record = Record::new(writer.schema()).unwrap();
@@ -263,7 +263,7 @@
//! # "#;
//! # let schema = Schema::parse_str(raw_schema).unwrap();
//! // a writer needs a schema and something to write to
-//! let mut writer = Writer::new(&schema, Vec::new());
+//! let mut writer = Writer::new(&schema, Vec::new()).unwrap();
//!
//! // the structure models our Record schema
//! let test = Test {
@@ -353,7 +353,7 @@
//! # }
//! # "#;
//! # let schema = Schema::parse_str(raw_schema).unwrap();
-//! # let mut writer = Writer::new(&schema, Vec::new());
+//! # let mut writer = Writer::new(&schema, Vec::new()).unwrap();
//! # let mut record = Record::new(writer.schema()).unwrap();
//! # record.put("a", 27i64);
//! # record.put("b", "foo");
@@ -382,7 +382,7 @@
//! # }
//! # "#;
//! # let writer_schema = Schema::parse_str(writer_raw_schema).unwrap();
-//! # let mut writer = Writer::new(&writer_schema, Vec::new());
+//! # let mut writer = Writer::new(&writer_schema, Vec::new()).unwrap();
//! # let mut record = Record::new(writer.schema()).unwrap();
//! # record.put("a", 27i64);
//! # record.put("b", "foo");
@@ -442,7 +442,7 @@
//! # "#;
//! # let schema = Schema::parse_str(raw_schema).unwrap();
//! # let schema = Schema::parse_str(raw_schema).unwrap();
-//! # let mut writer = Writer::new(&schema, Vec::new());
+//! # let mut writer = Writer::new(&schema, Vec::new()).unwrap();
//! # let mut record = Record::new(writer.schema()).unwrap();
//! # record.put("a", 27i64);
//! # record.put("b", "foo");
@@ -487,7 +487,7 @@
//! # }
//! # "#;
//! # let schema = Schema::parse_str(raw_schema).unwrap();
-//! # let mut writer = Writer::new(&schema, Vec::new());
+//! # let mut writer = Writer::new(&schema, Vec::new()).unwrap();
//! # let test = Test {
//! # a: 27,
//! # b: "foo".to_owned(),
@@ -533,7 +533,7 @@
//!
//! println!("{:?}", schema);
//!
-//! let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default()));
+//! let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default())).unwrap();
//!
//! let mut record = Record::new(writer.schema()).unwrap();
//! record.put("a", 27i64);
@@ -659,7 +659,7 @@
//!
//! println!("{:?}", schema);
//!
-//! let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default()));
+//! let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Deflate(DeflateSettings::default())).unwrap() ;
//!
//! let mut record = Record::new(writer.schema()).unwrap();
//! record.put("decimal_fixed",
Decimal::from(9936.to_bigint().unwrap().to_signed_bytes_be()));
@@ -921,7 +921,7 @@
//! ];
//!
//! // Serialize records to Avro binary format with the schema
-//! let mut writer = apache_avro::Writer::new(&schema, Vec::new());
+//! let mut writer = apache_avro::Writer::new(&schema, Vec::new()).unwrap();
//! for record in &records {
//! writer.append_ser(record).unwrap();
//! }
@@ -1080,7 +1080,7 @@ mod tests {
"#;
let writer_schema = Schema::parse_str(writer_raw_schema).unwrap();
let reader_schema = Schema::parse_str(reader_raw_schema).unwrap();
- let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null);
+ let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null).unwrap();
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
@@ -1121,7 +1121,7 @@ mod tests {
}
"#;
let schema = Schema::parse_str(raw_schema).unwrap();
- let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+ let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Null).unwrap();
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
@@ -1163,7 +1163,7 @@ mod tests {
}
"#;
let writer_schema = Schema::parse_str(writer_raw_schema).unwrap();
- let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null);
+ let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null).unwrap();
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
@@ -1196,7 +1196,7 @@ mod tests {
let schema = Schema::parse_str(raw_schema).unwrap();
- // Would allocated 18446744073709551605 bytes
+ // Would allocate 18446744073709551605 bytes
let illformed: &[u8] = &[0x3e, 0x15, 0xff, 0x1f, 0x15, 0xff];
let value = from_avro_datum(&schema, &mut &*illformed, None);
diff --git a/avro/src/reader.rs b/avro/src/reader.rs
index e2f7570..ec7412c 100644
--- a/avro/src/reader.rs
+++ b/avro/src/reader.rs
@@ -812,7 +812,7 @@ mod tests {
use crate::writer::Writer;
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let mut user_meta_data: HashMap<String, Vec<u8>> = HashMap::new();
user_meta_data.insert(
diff --git a/avro/src/schema.rs b/avro/src/schema.rs
index 582e104..5839578 100644
--- a/avro/src/schema.rs
+++ b/avro/src/schema.rs
@@ -5249,7 +5249,7 @@ mod tests {
let avro_value = crate::to_value(foo)?;
assert!(avro_value.validate(&schema));
- let mut writer = crate::Writer::new(&schema, Vec::new());
+ let mut writer = crate::Writer::new(&schema, Vec::new())?;
// schema validation happens here
writer.append(avro_value)?;
diff --git a/avro/src/schema_compatibility.rs b/avro/src/schema_compatibility.rs
index 3fec12a..be75b91 100644
--- a/avro/src/schema_compatibility.rs
+++ b/avro/src/schema_compatibility.rs
@@ -1451,7 +1451,7 @@ mod tests {
"#;
let writer_schema = Schema::parse_str(writer_raw_schema)?;
let reader_schema = Schema::parse_str(reader_raw_schema)?;
- let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null);
+ let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null)?;
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
@@ -1515,7 +1515,7 @@ mod tests {
"#;
let writer_schema = Schema::parse_str(writer_raw_schema)?;
let reader_schema = Schema::parse_str(reader_raw_schema)?;
- let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null);
+ let mut writer = Writer::with_codec(&writer_schema, Vec::new(),
Codec::Null)?;
let mut record = Record::new(writer.schema()).unwrap();
record.put("a", 27i64);
record.put("b", "foo");
diff --git a/avro/src/writer.rs b/avro/src/writer.rs
index 9c87991..4bf2cde 100644
--- a/avro/src/writer.rs
+++ b/avro/src/writer.rs
@@ -38,49 +38,72 @@ const AVRO_OBJECT_HEADER: &[u8] = b"Obj\x01";
/// It is critical to call flush before `Writer<W>` is dropped. Though
dropping will attempt to flush
/// the contents of the buffer, any errors that happen in the process of
dropping will be ignored.
/// Calling flush ensures that the buffer is empty and thus dropping will not
even attempt file operations.
-#[derive(bon::Builder)]
pub struct Writer<'a, W: Write> {
schema: &'a Schema,
writer: W,
- #[builder(skip)]
- resolved_schema: Option<ResolvedSchema<'a>>,
- #[builder(default = Codec::Null)]
+ resolved_schema: ResolvedSchema<'a>,
codec: Codec,
- #[builder(default = DEFAULT_BLOCK_SIZE)]
block_size: usize,
- #[builder(skip = Vec::with_capacity(block_size))]
buffer: Vec<u8>,
- #[builder(skip)]
num_values: usize,
- #[builder(default = generate_sync_marker())]
marker: [u8; 16],
- /// Has the header already been written.
- ///
- /// To disable writing the header, this can be set to `true`.
- #[builder(default = false)]
has_header: bool,
- #[builder(default)]
user_metadata: HashMap<String, Value>,
}
+#[bon::bon]
+impl<'a, W: Write> Writer<'a, W> {
+ #[builder]
+ pub fn builder(
+ schema: &'a Schema,
+ schemata: Option<Vec<&'a Schema>>,
+ writer: W,
+ #[builder(default = Codec::Null)] codec: Codec,
+ #[builder(default = DEFAULT_BLOCK_SIZE)] block_size: usize,
+ #[builder(default = generate_sync_marker())] marker: [u8; 16],
+ /// Has the header already been written.
+ ///
+ /// To disable writing the header, this can be set to `true`.
+ #[builder(default = false)]
+ has_header: bool,
+ #[builder(default)] user_metadata: HashMap<String, Value>,
+ ) -> AvroResult<Self> {
+ let resolved_schema = if let Some(schemata) = schemata {
+ ResolvedSchema::try_from(schemata)?
+ } else {
+ ResolvedSchema::try_from(schema)?
+ };
+ Ok(Self {
+ schema,
+ writer,
+ resolved_schema,
+ codec,
+ block_size,
+ buffer: Vec::with_capacity(block_size),
+ num_values: 0,
+ marker,
+ has_header,
+ user_metadata,
+ })
+ }
+}
+
impl<'a, W: Write> Writer<'a, W> {
/// Creates a `Writer` given a `Schema` and something implementing the
`io::Write` trait to write
/// to.
/// No compression `Codec` will be used.
- pub fn new(schema: &'a Schema, writer: W) -> Self {
+ pub fn new(schema: &'a Schema, writer: W) -> AvroResult<Self> {
Writer::with_codec(schema, writer, Codec::Null)
}
/// Creates a `Writer` with a specific `Codec` given a `Schema` and
something implementing the
/// `io::Write` trait to write to.
- pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) -> Self {
- let mut w = Self::builder()
+ pub fn with_codec(schema: &'a Schema, writer: W, codec: Codec) ->
AvroResult<Self> {
+ Self::builder()
.schema(schema)
.writer(writer)
.codec(codec)
- .build();
- w.resolved_schema = ResolvedSchema::try_from(schema).ok();
- w
+ .build()
}
/// Creates a `Writer` with a specific `Codec` given a `Schema` and
something implementing the
@@ -92,20 +115,19 @@ impl<'a, W: Write> Writer<'a, W> {
schemata: Vec<&'a Schema>,
writer: W,
codec: Codec,
- ) -> Self {
- let mut w = Self::builder()
+ ) -> AvroResult<Self> {
+ Self::builder()
.schema(schema)
+ .schemata(schemata)
.writer(writer)
.codec(codec)
- .build();
- w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
- w
+ .build()
}
/// Creates a `Writer` that will append values to already populated
/// `std::io::Write` using the provided `marker`
/// No compression `Codec` will be used.
- pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) -> Self {
+ pub fn append_to(schema: &'a Schema, writer: W, marker: [u8; 16]) ->
AvroResult<Self> {
Writer::append_to_with_codec(schema, writer, Codec::Null, marker)
}
@@ -116,16 +138,14 @@ impl<'a, W: Write> Writer<'a, W> {
writer: W,
codec: Codec,
marker: [u8; 16],
- ) -> Self {
- let mut w = Self::builder()
+ ) -> AvroResult<Self> {
+ Self::builder()
.schema(schema)
.writer(writer)
.codec(codec)
.marker(marker)
.has_header(true)
- .build();
- w.resolved_schema = ResolvedSchema::try_from(schema).ok();
- w
+ .build()
}
/// Creates a `Writer` that will append values to already populated
@@ -136,16 +156,15 @@ impl<'a, W: Write> Writer<'a, W> {
writer: W,
codec: Codec,
marker: [u8; 16],
- ) -> Self {
- let mut w = Self::builder()
+ ) -> AvroResult<Self> {
+ Self::builder()
.schema(schema)
+ .schemata(schemata)
.writer(writer)
.codec(codec)
.marker(marker)
.has_header(true)
- .build();
- w.resolved_schema = ResolvedSchema::try_from(schemata).ok();
- w
+ .build()
}
/// Get a reference to the `Schema` associated to a `Writer`.
@@ -178,24 +197,14 @@ impl<'a, W: Write> Writer<'a, W> {
pub fn append_value_ref(&mut self, value: &Value) -> AvroResult<usize> {
let n = self.maybe_write_header()?;
- // Lazy init for users using the builder pattern with error throwing
- match self.resolved_schema {
- Some(ref rs) => {
- write_value_ref_resolved(self.schema, rs, value, &mut
self.buffer)?;
- self.num_values += 1;
-
- if self.buffer.len() >= self.block_size {
- return self.flush().map(|b| b + n);
- }
+ write_value_ref_resolved(self.schema, &self.resolved_schema, value,
&mut self.buffer)?;
+ self.num_values += 1;
- Ok(n)
- }
- None => {
- let rs = ResolvedSchema::try_from(self.schema)?;
- self.resolved_schema = Some(rs);
- self.append_value_ref(value)
- }
+ if self.buffer.len() >= self.block_size {
+ return self.flush().map(|b| b + n);
}
+
+ Ok(n)
}
/// Append anything implementing the `Serialize` trait to a `Writer` for
@@ -210,29 +219,20 @@ impl<'a, W: Write> Writer<'a, W> {
pub fn append_ser<S: Serialize>(&mut self, value: S) -> AvroResult<usize> {
let n = self.maybe_write_header()?;
- match self.resolved_schema {
- Some(ref rs) => {
- let mut serializer = SchemaAwareWriteSerializer::new(
- &mut self.buffer,
- self.schema,
- rs.get_names(),
- None,
- );
- value.serialize(&mut serializer)?;
- self.num_values += 1;
-
- if self.buffer.len() >= self.block_size {
- return self.flush().map(|b| b + n);
- }
+ let mut serializer = SchemaAwareWriteSerializer::new(
+ &mut self.buffer,
+ self.schema,
+ self.resolved_schema.get_names(),
+ None,
+ );
+ value.serialize(&mut serializer)?;
+ self.num_values += 1;
- Ok(n)
- }
- None => {
- let rs = ResolvedSchema::try_from(self.schema)?;
- self.resolved_schema = Some(rs);
- self.append_ser(value)
- }
+ if self.buffer.len() >= self.block_size {
+ return self.flush().map(|b| b + n);
}
+
+ Ok(n)
}
/// Extend a `Writer` with an `Iterator` of compatible values
(implementing the `ToAvro`
@@ -366,9 +366,10 @@ impl<'a, W: Write> Writer<'a, W> {
let mut this = ManuallyDrop::new(self);
// Extract every member that is not Copy and therefore should be
dropped
- let _resolved_schema = std::mem::take(&mut this.resolved_schema);
let _buffer = std::mem::take(&mut this.buffer);
let _user_metadata = std::mem::take(&mut this.user_metadata);
+ // SAFETY: resolved schema is not accessed after this and won't be
dropped because of ManuallyDrop
+ unsafe { std::ptr::drop_in_place(&mut this.resolved_schema) };
// SAFETY: double-drops are prevented by putting `this` in a
ManuallyDrop that is never dropped
let writer = unsafe { std::ptr::read(&this.writer) };
@@ -861,7 +862,7 @@ mod tests {
let schema = Schema::parse_str(SCHEMA)?;
// By default flush should write the header even if nothing was added
yet
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
writer.flush()?;
let result = writer.into_inner()?;
assert_eq!(result.len(), 163);
@@ -871,7 +872,7 @@ mod tests {
.has_header(true)
.schema(&schema)
.writer(Vec::new())
- .build();
+ .build()?;
writer.flush()?;
let result = writer.into_inner()?;
assert_eq!(result.len(), 0);
@@ -1053,7 +1054,7 @@ mod tests {
#[test]
fn test_writer_append() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let mut record = Record::new(&schema).unwrap();
record.put("a", 27i64);
@@ -1087,7 +1088,7 @@ mod tests {
#[test]
fn test_writer_extend() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let mut record = Record::new(&schema).unwrap();
record.put("a", 27i64);
@@ -1128,7 +1129,7 @@ mod tests {
#[test]
fn test_writer_append_ser() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let record = TestSerdeSerialize {
a: 27,
@@ -1161,7 +1162,7 @@ mod tests {
#[test]
fn test_writer_extend_ser() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let record = TestSerdeSerialize {
a: 27,
@@ -1194,7 +1195,7 @@ mod tests {
Ok(())
}
- fn make_writer_with_codec(schema: &Schema) -> Writer<'_, Vec<u8>> {
+ fn make_writer_with_codec(schema: &Schema) -> AvroResult<Writer<'_,
Vec<u8>>> {
Writer::with_codec(
schema,
Vec::new(),
@@ -1202,7 +1203,7 @@ mod tests {
)
}
- fn make_writer_with_builder(schema: &Schema) -> Writer<'_, Vec<u8>> {
+ fn make_writer_with_builder(schema: &Schema) -> AvroResult<Writer<'_,
Vec<u8>>> {
Writer::builder()
.writer(Vec::new())
.schema(schema)
@@ -1245,14 +1246,14 @@ mod tests {
#[test]
fn test_writer_with_codec() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let writer = make_writer_with_codec(&schema);
+ let writer = make_writer_with_codec(&schema)?;
check_writer(writer, &schema)
}
#[test]
fn test_writer_with_builder() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let writer = make_writer_with_builder(&schema);
+ let writer = make_writer_with_builder(&schema)?;
check_writer(writer, &schema)
}
@@ -1282,7 +1283,7 @@ mod tests {
.schema(&schema)
.codec(codec)
.writer(Vec::new())
- .build();
+ .build()?;
let mut record1 = Record::new(&schema).unwrap();
record1.put(
@@ -1324,7 +1325,7 @@ mod tests {
#[test]
fn test_avro_3405_writer_add_metadata_success() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
writer.add_user_metadata("stringKey".to_string(),
String::from("stringValue"))?;
writer.add_user_metadata("strKey".to_string(), "strValue")?;
@@ -1348,7 +1349,7 @@ mod tests {
#[test]
fn test_avro_3881_metadata_empty_body() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
writer.add_user_metadata("a".to_string(), "b")?;
let result = writer.into_inner()?;
@@ -1364,7 +1365,7 @@ mod tests {
#[test]
fn test_avro_3405_writer_add_metadata_failure() -> TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let mut record = Record::new(&schema).unwrap();
record.put("a", 27i64);
@@ -1388,7 +1389,7 @@ mod tests {
#[test]
fn test_avro_3405_writer_add_metadata_reserved_prefix_failure() ->
TestResult {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let key = "avro.stringKey".to_string();
match writer
@@ -1430,7 +1431,7 @@ mod tests {
.writer(Vec::new())
.schema(&schema)
.user_metadata(user_meta_data.clone())
- .build();
+ .build()?;
assert_eq!(writer.user_metadata, user_meta_data);
@@ -1612,7 +1613,7 @@ mod tests {
};
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
let bytes = writer.append_ser(conf)?;
@@ -1645,7 +1646,7 @@ mod tests {
};
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new())?;
match writer.append_ser(conf) {
Ok(bytes) => panic!("Expected an error, but got {bytes} bytes
written"),
@@ -1696,7 +1697,7 @@ mod tests {
let schema = Schema::parse_str(SCHEMA)?;
- let mut writer = Writer::new(&schema, buffered_writer);
+ let mut writer = Writer::new(&schema, buffered_writer)?;
let mut record = Record::new(writer.schema()).unwrap();
record.put("exampleField", "value");
diff --git a/avro/tests/append_to_existing.rs b/avro/tests/append_to_existing.rs
index e9af4b3..ad702f8 100644
--- a/avro/tests/append_to_existing.rs
+++ b/avro/tests/append_to_existing.rs
@@ -37,7 +37,7 @@ fn avro_3630_append_to_an_existing_file() -> TestResult {
let marker = read_marker(&bytes[..]);
- let mut writer = Writer::append_to(&schema, bytes, marker);
+ let mut writer = Writer::append_to(&schema, bytes, marker)?;
writer
.append(create_datum(&schema, 2))
@@ -59,7 +59,10 @@ fn avro_3630_append_to_an_existing_file() -> TestResult {
fn avro_4031_append_to_file_using_multiple_writers() -> TestResult {
let schema = Schema::parse_str(SCHEMA).expect("Cannot parse the schema");
- let mut first_writer =
Writer::builder().schema(&schema).writer(Vec::new()).build();
+ let mut first_writer = Writer::builder()
+ .schema(&schema)
+ .writer(Vec::new())
+ .build()?;
first_writer.append(create_datum(&schema, -42))?;
let mut resulting_bytes = first_writer.into_inner()?;
let first_marker = read_marker(&resulting_bytes);
@@ -69,7 +72,7 @@ fn avro_4031_append_to_file_using_multiple_writers() ->
TestResult {
.has_header(true)
.marker(first_marker)
.writer(Vec::new())
- .build();
+ .build()?;
second_writer.append(create_datum(&schema, 42))?;
resulting_bytes.append(&mut second_writer.into_inner()?);
@@ -81,7 +84,7 @@ fn avro_4031_append_to_file_using_multiple_writers() ->
TestResult {
/// Simulates reading from a pre-existing .avro file and returns its bytes
fn get_avro_bytes(schema: &Schema) -> Vec<u8> {
- let mut writer = Writer::new(schema, Vec::new());
+ let mut writer = Writer::new(schema, Vec::new()).unwrap();
writer
.append(create_datum(schema, 1))
.expect("An error while appending data");
diff --git a/avro/tests/avro-rs-226.rs b/avro/tests/avro-rs-226.rs
index 10dc80d..385b42e 100644
--- a/avro/tests/avro-rs-226.rs
+++ b/avro/tests/avro-rs-226.rs
@@ -25,7 +25,7 @@ where
T: Serialize + DeserializeOwned + Debug + PartialEq + Clone,
{
let record2 = record.clone();
- let mut writer = Writer::new(schema, vec![]);
+ let mut writer = Writer::new(schema, vec![])?;
writer.append_ser(record)?;
let bytes_written = writer.into_inner()?;
diff --git a/avro/tests/avro-rs-285-bytes_deserialization.rs
b/avro/tests/avro-rs-285-bytes_deserialization.rs
index b2e97bf..1262cea 100644
--- a/avro/tests/avro-rs-285-bytes_deserialization.rs
+++ b/avro/tests/avro-rs-285-bytes_deserialization.rs
@@ -45,7 +45,7 @@ fn avro_rs_285_bytes_deserialization_round_trip() ->
TestResult {
];
// serialize records to Avro binary format with schema
- let mut writer = apache_avro::Writer::new(&schema, Vec::new());
+ let mut writer = apache_avro::Writer::new(&schema, Vec::new())?;
for record in &records {
writer.append_ser(record)?;
}
@@ -93,7 +93,7 @@ fn avro_rs_285_bytes_deserialization_filtered_round_trip() ->
TestResult {
];
// serialize records to Avro binary format with schema
- let mut writer = apache_avro::Writer::new(&schema, Vec::new());
+ let mut writer = apache_avro::Writer::new(&schema, Vec::new())?;
for record in &records {
writer.append_ser(record)?;
}
diff --git a/avro/tests/codecs.rs b/avro/tests/codecs.rs
index f58f770..89471f4 100644
--- a/avro/tests/codecs.rs
+++ b/avro/tests/codecs.rs
@@ -72,7 +72,7 @@ fn avro_4032_codec_settings(codec: Codec) -> TestResult {
}"#,
)?;
- let mut writer = Writer::with_codec(&schema, Vec::new(), codec);
+ let mut writer = Writer::with_codec(&schema, Vec::new(), codec)?;
let mut record = Record::new(writer.schema()).unwrap();
record.put("f1", 27_i32);
record.put("f2", "foo");
diff --git a/avro/tests/schema.rs b/avro/tests/schema.rs
index 92306c3..3c34230 100644
--- a/avro/tests/schema.rs
+++ b/avro/tests/schema.rs
@@ -811,7 +811,7 @@ fn test_record_schema_with_cyclic_references() ->
TestResult {
]),
);
- let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+ let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null)?;
if let Err(err) = writer.append(datum) {
panic!("An error occurred while writing datum: {err:?}")
}
@@ -1038,7 +1038,7 @@ fn test_avro_3847_union_field_with_default_value_of_ref()
-> TestResult {
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())]));
writer.append(record)?;
@@ -1111,7 +1111,7 @@ fn test_avro_3847_union_field_with_default_value_of_ref()
-> TestResult {
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Enum(1, "b".to_string()));
writer.append(record)?;
@@ -1171,7 +1171,7 @@ fn test_avro_3847_union_field_with_default_value_of_ref()
-> TestResult {
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Fixed(3, vec![0, 1, 2]));
writer.append(record)?;
@@ -1242,7 +1242,7 @@ fn
test_avro_3847_union_field_with_default_value_of_ref_with_namespace() -> Test
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())]));
writer.append(record)?;
@@ -1317,7 +1317,7 @@ fn
test_avro_3847_union_field_with_default_value_of_ref_with_namespace() -> Test
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Enum(1, "b".to_string()));
writer.append(record)?;
@@ -1379,7 +1379,7 @@ fn
test_avro_3847_union_field_with_default_value_of_ref_with_namespace() -> Test
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Fixed(3, vec![0, 1, 2]));
writer.append(record)?;
@@ -1451,7 +1451,7 @@ fn
test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())]));
writer.append(record)?;
@@ -1526,7 +1526,7 @@ fn
test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Enum(1, "b".to_string()));
writer.append(record)?;
@@ -1588,7 +1588,7 @@ fn
test_avro_3847_union_field_with_default_value_of_ref_with_enclosing_namespace
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Fixed(3, vec![0, 1, 2]));
writer.append(record)?;
@@ -1649,7 +1649,7 @@ fn write_schema_for_default_value_test() ->
apache_avro::AvroResult<Vec<u8>> {
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema())
.ok_or("Expected Some(Record), but got None")
.unwrap();
@@ -1932,7 +1932,7 @@ fn
test_avro_3851_read_default_value_for_ref_record_field() -> TestResult {
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
let mut record = Record::new(writer.schema()).ok_or("Expected
Some(Record), but got None")?;
record.put("f1", Value::Record(vec![("f1_1".to_string(), 10.into())]));
writer.append(record)?;
@@ -1997,7 +1997,7 @@ fn test_avro_3851_read_default_value_for_enum() ->
TestResult {
}
"#;
let writer_schema = Schema::parse_str(writer_schema_str)?;
- let mut writer = Writer::new(&writer_schema, Vec::new());
+ let mut writer = Writer::new(&writer_schema, Vec::new())?;
writer.append("c")?;
let reader_schema_str = r#"
@@ -2364,7 +2364,7 @@ fn
avro_rs_66_test_independent_canonical_form_missing_ref() -> TestResult {
fn avro_rs_181_single_null_record() -> TestResult {
let mut buff = Cursor::new(Vec::new());
let schema = Schema::parse_str(r#""null""#)?;
- let mut writer = Writer::new(&schema, &mut buff);
+ let mut writer = Writer::new(&schema, &mut buff)?;
writer.append(serde_json::Value::Null)?;
writer.into_inner()?;
buff.set_position(0);
diff --git a/avro/tests/shared.rs b/avro/tests/shared.rs
index c9397df..f23c837 100644
--- a/avro/tests/shared.rs
+++ b/avro/tests/shared.rs
@@ -115,7 +115,7 @@ fn test_folder(folder: &str) -> Result<(), ErrorsDesc> {
let reader =
Reader::with_schema(&schema, BufReader::new(&file)).expect("Can't
read data.avro");
- let mut writer = Writer::with_codec(&schema, Vec::new(), Codec::Null);
+ let mut writer = Writer::with_codec(&schema, Vec::new(),
Codec::Null).unwrap();
let mut records: Vec<Value> = vec![];
diff --git a/avro/tests/to_from_avro_datum_schemata.rs
b/avro/tests/to_from_avro_datum_schemata.rs
index c300c0f..b5389fe 100644
--- a/avro/tests/to_from_avro_datum_schemata.rs
+++ b/avro/tests/to_from_avro_datum_schemata.rs
@@ -107,7 +107,7 @@ fn test_avro_3683_multiple_schemata_writer_reader() ->
TestResult {
let schema_b = schemata[1];
let mut output: Vec<u8> = Vec::new();
- let mut writer = Writer::with_schemata(schema_b, schemata.clone(), &mut
output, Codec::Null);
+ let mut writer = Writer::with_schemata(schema_b, schemata.clone(), &mut
output, Codec::Null)?;
writer.append(record.clone())?;
writer.flush()?;
drop(writer); //drop the writer so that `output` is no more referenced
mutably
diff --git a/avro/tests/union_schema.rs b/avro/tests/union_schema.rs
index b9c22ff..6e48e47 100644
--- a/avro/tests/union_schema.rs
+++ b/avro/tests/union_schema.rs
@@ -71,7 +71,7 @@ where
{
let mut encoded: Vec<u8> = Vec::new();
let mut writer =
- Writer::with_schemata(schema, schemata.iter().collect(), &mut encoded,
Codec::Null);
+ Writer::with_schemata(schema, schemata.iter().collect(), &mut encoded,
Codec::Null)?;
writer.append_ser(input)?;
writer.flush()?;
drop(writer); //drop the writer so that `encoded` is no more referenced
mutably
diff --git a/avro_derive/README.md b/avro_derive/README.md
index 0098d38..f72d582 100644
--- a/avro_derive/README.md
+++ b/avro_derive/README.md
@@ -56,7 +56,7 @@ struct Test {
// derived schema, always valid or code fails to compile with a descriptive
message
let schema = Test::get_schema();
-let mut writer = Writer::new(&schema, Vec::new());
+let mut writer = Writer::new(&schema, Vec::new()).unwrap();
let test = Test {
a: 27,
b: "foo".to_owned(),
diff --git a/avro_derive/tests/derive.rs b/avro_derive/tests/derive.rs
index 4e0e0fb..8d92c57 100644
--- a/avro_derive/tests/derive.rs
+++ b/avro_derive/tests/derive.rs
@@ -52,7 +52,7 @@ mod test_derive {
T: Serialize + AvroSchema,
{
let schema = T::get_schema();
- let mut writer = Writer::new(&schema, Vec::new());
+ let mut writer = Writer::new(&schema, Vec::new()).unwrap();
if let Err(e) = writer.append_ser(obj) {
panic!("{e:?}");
}
diff --git a/wasm-demo/tests/demos.rs b/wasm-demo/tests/demos.rs
index 30bbe9c..6bd0842 100644
--- a/wasm-demo/tests/demos.rs
+++ b/wasm-demo/tests/demos.rs
@@ -68,7 +68,8 @@ fn write_read() {
&schema,
BufWriter::new(Vec::with_capacity(200)),
Codec::Null,
- );
+ )
+ .unwrap();
writer.append(record).unwrap();
writer.flush().unwrap();
let bytes = writer.into_inner().unwrap().into_inner().unwrap();