martin-g commented on code in PR #512: URL: https://github.com/apache/avro-rs/pull/512#discussion_r2965512779
########## avro/src/documentation/avro_data_model_to_serde.rs: ########## @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Mapping the Avro data model to the Serde data model +//! +//! When manually mapping an Avro schema to Rust types it is important to understand +//! how the different data models are mapped. When mapping from Rust types to an Avro schema, +//! see [the documentation for the reverse](super::serde_data_model_to_avro). +//! +//! Only the the mapping as defined here is supported. Any other behavior might change in a minor version. Review Comment: ```suggestion //! Only the mapping as defined here is supported. Any other behavior might change in a minor version. ``` ########## avro/src/schema/union.rs: ########## @@ -74,7 +81,116 @@ impl UnionSchema { &self.schemas } - /// Returns true if the any of the variants of this `UnionSchema` is `Null`. + /// Get the variant at the given index. + pub fn get_variant(&self, index: usize) -> Result<&Schema, Error> { + self.schemas.get(index).ok_or_else(|| { + Details::GetUnionVariant { + index: index as i64, + num_variants: self.schemas.len(), + } + .into() + }) + } + + /// Get the index of the provided [`SchemaKind`]. + /// + /// The schema must not be a logical type, as only the base type are saved in the lookup index. + pub(crate) fn index_of_schema_kind(&self, kind: SchemaKind) -> Option<usize> { + self.variant_index.get(&kind).copied() + } + + /// Get the index and schema for the provided name. + /// + /// Will use `names` to resolve references. + pub(crate) fn find_named_schema<'s>( + &'s self, + name: &str, + names: &'s HashMap<Name, impl Borrow<Schema>>, + ) -> Result<Option<(usize, &'s Schema)>, Error> { + for index in self.named_index.iter().copied() { + let schema = &self.schemas[index]; + if let Some(schema_name) = schema.name() + && schema_name.name() == name + { + let schema = if let Schema::Ref { name } = schema { + names + .get(name) + .ok_or_else(|| Details::SchemaResolutionError(name.clone()))? + .borrow() + } else { + schema + }; + return Ok(Some((index, schema))); + } + } + Ok(None) + } + + /// Find a [`Schema::Fixed`] with the given size. + /// + /// Will use `names` to resolve references. + pub(crate) fn find_fixed_of_size_n<'s>( + &'s self, + size: usize, + names: &'s HashMap<Name, impl Borrow<Schema>>, + ) -> Result<Option<(usize, &'s FixedSchema)>, Error> { + for index in self.named_index.iter().copied() { + let schema = &self.schemas[index]; + let schema = if let Schema::Ref { name } = schema { + names + .get(name) + .ok_or_else(|| Details::SchemaResolutionError(name.clone()))? + .borrow() + } else { + schema + }; + match schema { + Schema::Fixed(fixed) + | Schema::Uuid(UuidSchema::Fixed(fixed)) + | Schema::Decimal(DecimalSchema { + inner: InnerDecimalSchema::Fixed(fixed), + .. + }) + | Schema::Duration(fixed) + if fixed.size == size => + { + return Ok(Some((index, fixed))); + } + _ => {} + } + } + Ok(None) + } + + /// Find a [`Schema::Record`] with `n` fields. + /// + /// Will use `names` to resolve references. + pub(crate) fn find_record_with_n_fields<'s>( Review Comment: What is the idea here ? If a union schema has several record schemas with the same number of fields but with different types of these fields then the first one wins. Why do we need this ? ########## avro/src/reader/datum.rs: ########## @@ -124,6 +137,68 @@ impl<'s> GenericDatumReader<'s> { Ok(value) } } + + /// Read a Avro datum from the reader. + /// + /// # Panics + /// Will panic if a reader schema has been configured, this is a WIP. + pub fn read_deser<T: DeserializeOwned>(&self, reader: &mut impl Read) -> AvroResult<T> { + // `reader` is `impl Read` instead of a generic on the function like T so it's easier to + // specify the type wanted (`read_deser<String>` vs `read_deser<String, _>`) + if let Some((_, _)) = &self.reader { + todo!("Schema aware deserialisation does not resolve schemas yet"); Review Comment: Is this a todo for the current PR or for a follow-up ? ########## avro/src/serde/ser_schema/record/field_default.rs: ########## @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Serialize, Serializer, ser::Error}; +use serde_json::Value; + +use crate::{ + Schema, + schema::{SchemaKind, UnionSchema, UuidSchema}, + serde::ser_schema::SERIALIZING_SCHEMA_DEFAULT, +}; + +pub struct SchemaAwareRecordFieldDefault<'v, 's> { + value: &'v Value, + schema: &'s Schema, +} + +impl<'v, 's> SchemaAwareRecordFieldDefault<'v, 's> { + pub fn new(value: &'v Value, schema: &'s Schema) -> Self { + SchemaAwareRecordFieldDefault { value, schema } + } + + fn serialize_as_newtype_variant<S: Serializer>( + &self, + serializer: S, + index: usize, + union: &'s UnionSchema, + ) -> Result<S::Ok, S::Error> { + let value = Self::new(self.value, &union.variants()[index]); + serializer.serialize_newtype_variant( + SERIALIZING_SCHEMA_DEFAULT, + index as u32, + SERIALIZING_SCHEMA_DEFAULT, + &value, + ) + } + + fn recursive_type_check(value: &Value, schema: &'s Schema) -> bool { + match (value, schema) { + (Value::Null, Schema::Null) + | (Value::Bool(_), Schema::Boolean) + | (Value::String(_), Schema::Bytes | Schema::String) => true, + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + i32::try_from(long).is_ok() + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => true, + (Value::Number(n), Schema::Float | Schema::Double) if n.is_f64() => true, Review Comment: ```suggestion (Value::Number(n), Schema::Float | Schema::Double) if n.as_f64().is_some() => true, ``` to support also integers as defaults for floating point schema types ########## avro/src/serde/ser_schema/record/mod.rs: ########## @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod field_default; + +use std::{borrow::Borrow, cmp::Ordering, collections::HashMap, io::Write}; + +use serde::{ + Serialize, + ser::{SerializeMap, SerializeStruct, SerializeStructVariant}, +}; + +use super::{Config, SchemaAwareSerializer}; +use crate::{ + Error, Schema, + error::Details, + schema::RecordSchema, + serde::{ + ser_schema::record::field_default::SchemaAwareRecordFieldDefault, util::StringSerializer, + }, +}; + +pub struct RecordSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + record: &'s RecordSchema, + config: Config<'s, S>, + /// Cache fields received out-of-order + cache: HashMap<usize, Vec<u8>>, + /// The position of the current map entry being written + map_position: Option<usize>, + /// The field that should be written now. + field_position: usize, + bytes_written: usize, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> RecordSerializer<'s, 'w, W, S> { + pub fn new( + writer: &'w mut W, + record: &'s RecordSchema, + config: Config<'s, S>, + bytes_written: Option<usize>, + ) -> Self { + Self { + writer, + record, + config, + cache: HashMap::new(), + map_position: None, + field_position: 0, + bytes_written: bytes_written.unwrap_or(0), + } + } + + fn field_error(&self, position: usize, error: Error) -> Error { + let field = &self.record.fields[position]; + let error = match error.into_details() { + Details::SerializeValueWithSchema { + value_type, + value, + schema: _, + } => format!("Failed to serialize value of type `{value_type}`: {value}"), + Details::SerializeRecordFieldWithSchema { + field_name, + record_schema, + error, + } => format!( + "Failed to serialize field '{field_name}' of record {}: {error}", + record_schema.name + ), + Details::MissingDefaultForSkippedField { field_name, schema } => { + format!( + "Missing default for skipped field '{field_name}' for record {}", + schema.name + ) + } + details => format!("{details:?}"), + }; + Error::new(Details::SerializeRecordFieldWithSchema { + field_name: field.name.clone(), + record_schema: self.record.clone(), + error, + }) + } + + fn serialize_next_field<T: ?Sized + Serialize>( + &mut self, + position: usize, + value: &T, + ) -> Result<(), Error> { + let field = &self.record.fields[position]; + match self.field_position.cmp(&position) { + Ordering::Equal => { + // Field received in the right order + self.bytes_written += value + .serialize(SchemaAwareSerializer::new( + self.writer, + &field.schema, + self.config, + )?) + .map_err(|e| self.field_error(self.field_position, e))?; + self.field_position += 1; + + // Write any fields that were already received and can now be written + while let Some(bytes) = self.cache.remove(&self.field_position) { + self.writer.write_all(&bytes).map_err(Details::WriteBytes)?; + self.bytes_written += bytes.len(); + self.field_position += 1; + } + + Ok(()) + } + Ordering::Less => { + // Another field needs to be written first, so cache this field + let mut bytes = Vec::new(); + value + .serialize(SchemaAwareSerializer::new( + &mut bytes, + &field.schema, + self.config, + )?) + .map_err(|e| self.field_error(self.field_position, e))?; + if self.cache.insert(position, bytes).is_some() { + Err(Details::FieldNameDuplicate(field.name.clone()).into()) + } else { + Ok(()) + } + } + Ordering::Greater => { + // This field is already written to the writer so we got a duplicate + Err(Details::FieldNameDuplicate(field.name.clone()).into()) + } + } + } + + fn serialize_default(&mut self, position: usize) -> Result<(), Error> { + let field = &self.record.fields[position]; + if let Some(default) = &field.default { + self.serialize_next_field( + position, + &SchemaAwareRecordFieldDefault::new(default, &field.schema), + ) + .map_err(|e| self.field_error(position, e)) + } else { + Err(Details::MissingDefaultForSkippedField { + field_name: field.name.clone(), + schema: self.record.clone(), + } + .into()) + } + } + + fn end(mut self) -> Result<usize, Error> { + // Write any fields that were skipped by `#[serde(skip)]` or `#[serde(skip_serializing{,_if}]` + while self.field_position != self.record.fields.len() { + self.serialize_default(self.field_position)?; + } + + Ok(self.bytes_written) + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeStruct for RecordSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_field<T>(&mut self, key: &'static str, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + if let Some(position) = self.record.lookup.get(key).copied() { + self.serialize_next_field(position, value) + } else { + Err(Details::GetField(key.to_string()).into()) + } + } + + fn skip_field(&mut self, key: &'static str) -> Result<(), Self::Error> { + if let Some(position) = self.record.lookup.get(key).copied() { + self.serialize_default(position) + } else { + Err(Details::GetField(key.to_string()).into()) + } + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + self.end() + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for RecordSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let name = key.serialize(StringSerializer)?; + if let Some(position) = self.record.lookup.get(&name).copied() { + self.map_position = Some(position); + Ok(()) + } else { + Err(Details::FieldName(name.to_string()).into()) + } + } + + fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + self.serialize_next_field( + self.map_position + .expect("serialze_value called without calling serialize_key"), Review Comment: ```suggestion .expect("serialize_value called without calling serialize_key"), ``` ########## avro/src/writer/single_object.rs: ########## @@ -86,12 +89,35 @@ impl GenericSingleObjectWriter { } /// Writer that encodes messages according to the single object encoding v1 spec +#[derive(Builder)] pub struct SpecificSingleObjectWriter<T> where T: AvroSchema, { + #[builder( + with = |schema: Schema| -> Result<_, Error> { ResolvedOwnedSchema::new(schema) }, + default = ResolvedOwnedSchema::new(T::get_schema()).expect("AvroSchema implementation should create valid schemas") + )] resolved: ResolvedOwnedSchema, + #[builder( + default = RabinFingerprintHeader::from_schema(resolved.get_root_schema()).build_header(), + with = |header_builder: impl HeaderBuilder| header_builder.build_header(), + )] header: Vec<u8>, + /// Should [`Serialize`] implementations pick a human readable represenation. Review Comment: ```suggestion /// Should [`Serialize`] implementations pick a human readable representation. ``` ########## avro/src/serde/ser_schema/tuple.rs: ########## @@ -0,0 +1,283 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Borrow, io::Write}; + +use serde::{ + Serialize, + ser::{SerializeTuple, SerializeTupleStruct, SerializeTupleVariant}, +}; + +use super::{Config, SchemaAwareSerializer, union::UnionSerializer}; +use crate::{ + Error, Schema, + error::Details, + schema::{RecordSchema, UnionSchema}, +}; + +#[expect( + private_interfaces, + reason = "One{,Union}TupleSerializer should not be used directly" +)] +pub enum TupleSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + Unit(usize), + One(OneTupleSerializer<'s, 'w, W, S>), + /// This exists because we can't create a `&Schema::Union` from a `&UnionSchema` + OneUnion(OneUnionTupleSerializer<'s, 'w, W, S>), + Many(ManyTupleSerializer<'s, 'w, W, S>), +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> TupleSerializer<'s, 'w, W, S> { + pub fn unit(bytes_written: Option<usize>) -> Self { + Self::Unit(bytes_written.unwrap_or(0)) + } + + pub fn one( + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + bytes_written: Option<usize>, + ) -> Self { + Self::One(OneTupleSerializer::new( + writer, + schema, + config, + bytes_written, + )) + } + + pub fn one_union(writer: &'w mut W, union: &'s UnionSchema, config: Config<'s, S>) -> Self { + Self::OneUnion(OneUnionTupleSerializer::new(writer, union, config)) + } + + pub fn many( + writer: &'w mut W, + schema: &'s RecordSchema, + config: Config<'s, S>, + bytes_written: Option<usize>, + ) -> Self { + Self::Many(ManyTupleSerializer::new( + writer, + schema, + config, + bytes_written, + )) + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeTuple for TupleSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + match self { + TupleSerializer::Unit(_) => Err(Error::new(Details::SerializeValueWithSchema { + value_type: "tuple", + value: "Expected no elements for the unit tuple".into(), + schema: Schema::Null, + })), + TupleSerializer::One(one) => one.serialize_element(value), + TupleSerializer::OneUnion(one) => one.serialize_element(value), + TupleSerializer::Many(many) => many.serialize_element(value), + } + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + match self { + TupleSerializer::Unit(bytes_written) => Ok(bytes_written), + TupleSerializer::One(one) => one.end(), + TupleSerializer::OneUnion(one) => one.end(), + TupleSerializer::Many(many) => SerializeTuple::end(many), + } + } +} + +pub struct ManyTupleSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + schema: &'s RecordSchema, + config: Config<'s, S>, + field_position: usize, + bytes_written: usize, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> ManyTupleSerializer<'s, 'w, W, S> { + pub fn new( + writer: &'w mut W, + schema: &'s RecordSchema, + config: Config<'s, S>, + bytes_written: Option<usize>, + ) -> Self { + Self { + writer, + schema, + config, + field_position: 0, + bytes_written: bytes_written.unwrap_or(0), + } + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeTuple for ManyTupleSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + let schema = &self.schema.fields[self.field_position].schema; + self.bytes_written += value.serialize(SchemaAwareSerializer::new( + self.writer, + schema, + self.config, + )?)?; + self.field_position += 1; + Ok(()) + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + assert_eq!(self.field_position, self.schema.fields.len()); Review Comment: Why assert_eq!(), i.e. a possible panic ? Why not an Err or at least a debug_assert_eq!()? ########## avro/src/reader/mod.rs: ########## @@ -113,6 +125,10 @@ impl<'a, R: Read> Reader<'a, R> { self.block.read_next(read_schema) } + + fn read_next_deser<T: DeserializeOwned>(&mut self) -> AvroResult<Option<T>> { + self.block.read_next_deser(self.reader_schema) Review Comment: Should this also use `if self.should_resolve_schema {}` as ::read_next() does above ? ########## avro/src/reader/block.rs: ########## @@ -205,6 +215,43 @@ impl<'r, R: Read> Block<'r, R> { Ok(Some(item)) } + pub(super) fn read_next_deser<T: DeserializeOwned>( + &mut self, + read_schema: Option<&Schema>, + ) -> AvroResult<Option<T>> { + if self.is_empty() { + self.read_block_next()?; + if self.is_empty() { + return Ok(None); + } + } + + let mut block_bytes = &self.buf[self.buf_idx..]; + let b_original = block_bytes.len(); + + let item = if read_schema.is_some() { + todo!("Schema aware deserialisation does not resolve schemas yet"); Review Comment: Is this a todo for the current PR or for a follow-up ? ########## avro/src/documentation/avro_data_model_to_serde.rs: ########## @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Mapping the Avro data model to the Serde data model +//! +//! When manually mapping an Avro schema to Rust types it is important to understand +//! how the different data models are mapped. When mapping from Rust types to an Avro schema, +//! see [the documentation for the reverse](super::serde_data_model_to_avro). +//! +//! Only the the mapping as defined here is supported. Any other behavior might change in a minor version. +//! +//! ## Primitive Types +//! - `null`: `()` +//! - `boolean`: [`bool`] +//! - `int`: [`i32`] (or [`i16`], [`i8`], [`u16`], [`u8`]) +//! - `long`: [`i64`] (or [`u32`]) +//! - `float`: [`f32`] +//! - `double`: [`f64`] +//! - `bytes`: [`Vec<u8>`](std::vec::Vec) (or any type that uses [`Serialize::serialize_bytes`](serde::Serialize), [`Deserialize::deserialize_bytes`](serde::Deserialize), [`Deserialize::deserialize_byte_buf`](serde::Deserialize)) +//! - It is required to use [`apache_avro::serde::bytes`] as otherwise Serde will (de)serialize a `Vec` as an array of integers instead. +//! - `string`: [`String`] (or any type that uses [`Serialize::serialize_str`](serde::Serialize), [`Deserialize::deserialize_str`](serde::Deserialize), [`Deserialize::deserialize_string`](serde::Deserialize)) +//! +//! ## Complex Types +//! - `records`: A struct with the same name and fields or a tuple with the same fields. +//! - Extra fields can be added to the struct if they are marked with `#[serde(skip)]` +//! - `enums`: A enum with the same name and unit variants for every symbol +//! - The index of the symbol most match the index of the variant Review Comment: ```suggestion //! - The index of the symbol must match the index of the variant ``` ########## avro/src/serde/mod.rs: ########## @@ -27,7 +27,8 @@ //! details on how to change the generated schema. //! //! Alternatively, you can write your own schema. If you go down this path, it is recommended you start with -//! the schema derived by [`AvroSchema`] and then modify it to fit your needs. +//! the schema derived by [`AvroSchema`] and then modify it to fit your needs. For more information on mapping +//! between Sere and Avro see [Avro to Serde](crate::documentation::avro_data_model_to_serde) and [Serde to Avro](crate::documentation::serde_data_model_to_avro). Review Comment: ```suggestion //! between Serde and Avro see [Avro to Serde](crate::documentation::avro_data_model_to_serde) and [Serde to Avro](crate::documentation::serde_data_model_to_avro). ``` ########## avro/src/serde/ser_schema/block.rs: ########## @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Borrow, io::Write}; + +use serde::{ + Serialize, + ser::{SerializeMap, SerializeSeq}, +}; + +use super::{Config, SchemaAwareSerializer}; +use crate::{ + Error, Schema, + encode::encode_int, + error::Details, + schema::{ArraySchema, MapSchema}, + util::zig_i32, +}; + +#[expect( + private_interfaces, + reason = "Direct and Buffered should not be used directly" +)] +pub enum BlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + Direct(DirectBlockSerializer<'s, 'w, W, S>), + Buffered(BufferedBlockSerializer<'s, 'w, W, S>), +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> BlockSerializer<'s, 'w, W, S> { + pub fn array( + writer: &'w mut W, + schema: &'s ArraySchema, + config: Config<'s, S>, + len: Option<usize>, + bytes_written: Option<usize>, + ) -> Result<Self, Error> { + let schema = if let Schema::Ref { name } = schema.items.as_ref() { + config.get_schema(name)? + } else { + &schema.items + }; + + Self::new(writer, schema, config, len, bytes_written) + } + + pub fn map( + writer: &'w mut W, + schema: &'s MapSchema, + config: Config<'s, S>, + len: Option<usize>, + bytes_written: Option<usize>, + ) -> Result<Self, Error> { + let schema = if let Schema::Ref { name } = schema.types.as_ref() { + config.get_schema(name)? + } else { + &schema.types + }; + + Self::new(writer, schema, config, len, bytes_written) + } + + fn new( + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + len: Option<usize>, + bytes_written: Option<usize>, + ) -> Result<Self, Error> { + let bytes_written = bytes_written.unwrap_or(0); + if let Some(len) = len + && config.target_block_size.is_none() + { + Ok(Self::Direct(DirectBlockSerializer::new( + writer, + schema, + config, + len, + bytes_written, + )?)) + } else { + let target_block_size = config.target_block_size.unwrap_or(1024); + Ok(Self::Buffered(BufferedBlockSerializer::new( + writer, + schema, + config, + target_block_size, + bytes_written, + ))) + } + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + match self { + BlockSerializer::Direct(direct) => direct.serialize_element(value), + BlockSerializer::Buffered(buffered) => buffered.serialize_element(value), + } + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + match self { + BlockSerializer::Direct(direct) => direct.end(), + BlockSerializer::Buffered(buffered) => buffered.end(), + } + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for BlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + match self { + BlockSerializer::Direct(direct) => direct.serialize_key(key), + BlockSerializer::Buffered(buffered) => buffered.serialize_key(key), + } + } + + fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + match self { + BlockSerializer::Direct(direct) => direct.serialize_value(value), + BlockSerializer::Buffered(buffered) => buffered.serialize_value(value), + } + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + match self { + BlockSerializer::Direct(direct) => direct.end(), + BlockSerializer::Buffered(buffered) => buffered.end(), + } + } +} + +struct DirectBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + bytes_written: usize, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> DirectBlockSerializer<'s, 'w, W, S> { + pub fn new( + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + len: usize, + mut bytes_written: usize, + ) -> Result<Self, Error> { + if len != 0 { + // .end() always writes the zero block, so we only write the size for arrays + // that have at least one element + bytes_written += zig_i32(len as i32, &mut *writer)?; + } + Ok(Self { + writer, + schema, + config, + bytes_written, + }) + } + + fn end(self) -> Result<usize, Error> { + // Write the zero directly instead of through zig_i32 which does a lot of extra work + self.writer.write_all(&[0]).map_err(Details::WriteBytes)?; + + Ok(self.bytes_written + 1) + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for DirectBlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + self.bytes_written += value.serialize(SchemaAwareSerializer::new( + self.writer, + self.schema, + self.config, + )?)?; + Ok(()) + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + self.end() + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for DirectBlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + self.bytes_written += key.serialize(SchemaAwareSerializer::new( + self.writer, + &Schema::String, + self.config, + )?)?; + Ok(()) + } + + fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + self.serialize_element(value) + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + self.end() + } +} + +struct BufferedBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + buffer: Vec<u8>, + schema: &'s Schema, + config: Config<'s, S>, + bytes_written: usize, + items_in_buffer: usize, + target_block_size: usize, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> BufferedBlockSerializer<'s, 'w, W, S> { + pub fn new( + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + target_block_size: usize, + bytes_written: usize, + ) -> Self { + Self { + writer, + buffer: Vec::with_capacity(target_block_size), + schema, + config, + bytes_written, + items_in_buffer: 0, + target_block_size, + } + } + + /// Write a block including the items and size header. + fn write_block(&mut self) -> Result<(), Error> { + // Write the header, the negative item count indicates that the next value is the size of the + // block in bytes + self.bytes_written += encode_int(0 - (self.items_in_buffer as i32), &mut *self.writer)?; + self.bytes_written += encode_int(self.buffer.len() as i32, &mut *self.writer)?; + + // Write the actual data + self.writer + .write_all(&self.buffer) + .map_err(Details::WriteBytes)?; + self.bytes_written += self.buffer.len(); + + // Reset the buffer + self.items_in_buffer = 0; + self.buffer.clear(); + + Ok(()) + } + + fn end(mut self) -> Result<usize, Error> { + // Write any items remaining in the buffer + if self.items_in_buffer > 0 { + self.write_block()?; + } + debug_assert_eq!(self.buffer.len(), 0, "Buffer must be empty at this point"); + + // Write the zero directly instead of through zig_i32 which does a lot of extra work + self.writer.write_all(&[0]).map_err(Details::WriteBytes)?; + + Ok(self.bytes_written + 1) + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BufferedBlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + value.serialize(SchemaAwareSerializer::new( + self.writer, Review Comment: Shouldn't this use the buffer ? ```suggestion &mut self.buffer, ``` ########## avro/src/documentation/serde_data_model_to_avro.rs: ########## @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Mapping the Serde data model to the Avro data model +//! +//! When manually mapping Rust types to an Avro schema, or the reverse, it is important to understand +//! how the different data models are mapped. When mapping from an Avro schema to Rust types, +//! see [the documentation for the reverse](super::serde_data_model_to_avro). +//! +//! Only the schemas generated by the [`AvroSchema`] derive and the mapping as defined here are +//! supported. Any other behavior might change in a minor version. +//! +//! The following list is based on [the data model defined by Serde](https://serde.rs/data-model.html): +//! - **14 primitive types** +//! - `bool` => [`Schema::Boolean`] +//! - `i8`, `i16`, `i32`, `u8`, `u16` => [`Schema::Int`] +//! - `i64`, `u32` => [`Schema::Long`] +//! - `u64` => [`Schema::Fixed`]`(name: "u64", size: 8)` +//! - This is not a `Schema::Long` as that is a signed number of maximum 64 bits. +//! - `i128` => [`Schema::Fixed`]`(name: "i128", size: 16)` +//! - `u128` => [`Schema::Fixed`]`(name: "u128", size: 16)` +//! - `f32` => [`Schema::Float`] +//! - `f64` => [`Schema::Double`] +//! - `char` => [`Schema::String`] +//! - Only one character allowed, deserializer will return an error for strings with more than one character. +//! - **string** => [`Schema::String`] +//! - **byte array** => [`Schema::Bytes`] or [`Schema::Fixed`] +//! - **option** => [`Schema::Union([Schema::Null, _])`](crate::schema::Schema::Union) +//! - **unit** => [`Schema::Null`] +//! - **unit struct** => [`Schema::Record`] with the unqualified name equal to the name of the struct and zero fields +//! - **unit variant** => See [Enums](##Enums) +//! - **newtype struct** => [`Schema::Record`] with the unqualified name equal to the name of the struct and one field +//! - **newtype variant** => See [Enums](##Enums) +//! - **seq** => [`Schema::Array`] +//! - **tuple** +//! - For tuples with one element, the schema will be the schema the only element Review Comment: ```suggestion //! - For tuples with one element, the schema will be the schema of the only element ``` ########## avro/src/documentation/serde_data_model_to_avro.rs: ########## @@ -0,0 +1,116 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Mapping the Serde data model to the Avro data model +//! +//! When manually mapping Rust types to an Avro schema, or the reverse, it is important to understand +//! how the different data models are mapped. When mapping from an Avro schema to Rust types, +//! see [the documentation for the reverse](super::serde_data_model_to_avro). Review Comment: this seems to point to the current module/document ########## avro/src/schema/mod.rs: ########## @@ -784,6 +785,69 @@ impl Schema { } Ok(()) } + + /// Derive a name for this schema. + /// + /// The name is a valid schema name and will be unique if the named + /// schemas in this schema have unique names. + pub(crate) fn unique_normalized_name(&self) -> Cow<'static, str> { + match self { + Schema::Null => Cow::Borrowed("n"), + Schema::Boolean => Cow::Borrowed("B"), + Schema::Int => Cow::Borrowed("i"), + Schema::Long => Cow::Borrowed("l"), + Schema::Float => Cow::Borrowed("f"), + Schema::Double => Cow::Borrowed("d"), + Schema::Bytes => Cow::Borrowed("b"), + Schema::String => Cow::Borrowed("s"), + Schema::Array(array) => { + Cow::Owned(format!("a_{}", array.items.unique_normalized_name())) + } + Schema::Map(map) => Cow::Owned(format!("m_{}", map.types.unique_normalized_name())), + Schema::Union(union) => { + let mut name = format!("u{}", union.schemas.len()); + for schema in &union.schemas { + name.push('_'); + name.push_str(&schema.unique_normalized_name()); + } + Cow::Owned(name) + } + Schema::BigDecimal => Cow::Borrowed("bd"), + Schema::Date => Cow::Borrowed("D"), + Schema::TimeMillis => Cow::Borrowed("t"), + Schema::TimeMicros => Cow::Borrowed("tm"), + Schema::TimestampMillis => Cow::Borrowed("T"), + Schema::TimestampMicros => Cow::Borrowed("TM"), + Schema::TimestampNanos => Cow::Borrowed("TN"), + Schema::LocalTimestampMillis => Cow::Borrowed("L"), + Schema::LocalTimestampMicros => Cow::Borrowed("LM"), + Schema::LocalTimestampNanos => Cow::Borrowed("LN"), + Schema::Decimal(DecimalSchema { + inner: InnerDecimalSchema::Bytes, + precision, + scale, + }) => Cow::Owned(format!("db_{precision}_{scale}")), + Schema::Uuid(UuidSchema::Bytes) => Cow::Borrowed("ub"), + Schema::Uuid(UuidSchema::String) => Cow::Borrowed("us"), + Schema::Record(RecordSchema { name, .. }) + | Schema::Enum(EnumSchema { name, .. }) + | Schema::Fixed(FixedSchema { name, .. }) + | Schema::Decimal(DecimalSchema { + inner: InnerDecimalSchema::Fixed(FixedSchema { name, .. }), + .. + }) + | Schema::Uuid(UuidSchema::Fixed(FixedSchema { name, .. })) + | Schema::Duration(FixedSchema { name, .. }) + | Schema::Ref { name } => { + let name: String = name + .to_string() + .chars() + .map(|c| if c.is_ascii_alphanumeric() { c } else { '_' }) Review Comment: IMO `_` should be cared specially here. Currently two distinct names as `a.b.c` and `a_b.c` will lead to `r5_a_b_c`. ########## avro/src/serde/deser_schema/tuple.rs: ########## @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Borrow, io::Read}; + +use serde::de::{DeserializeSeed, SeqAccess}; + +use crate::{ + Error, Schema, + schema::RecordSchema, + serde::deser_schema::{Config, SchemaAwareDeserializer}, +}; + +pub struct OneTupleDeserializer<'s, 'r, R: Read, S: Borrow<Schema>> { + reader: &'r mut R, + schema: &'s Schema, + config: Config<'s, S>, + field_read: bool, +} + +impl<'s, 'r, R: Read, S: Borrow<Schema>> OneTupleDeserializer<'s, 'r, R, S> { + pub fn new( + reader: &'r mut R, + schema: &'s Schema, + config: Config<'s, S>, + ) -> Result<Self, Error> { + let schema = if let Schema::Ref { name } = schema { + config.get_schema(name)? + } else { + schema + }; + Ok(Self { + reader, + schema, + config, + field_read: false, + }) + } +} + +impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> SeqAccess<'de> + for OneTupleDeserializer<'s, 'r, R, S> +{ + type Error = Error; + + fn next_element_seed<T>(&mut self, seed: T) -> Result<Option<T::Value>, Self::Error> + where + T: DeserializeSeed<'de>, + { + if self.field_read { + Ok(None) + } else { + self.field_read = true; + seed.deserialize(SchemaAwareDeserializer::new( + self.reader, + self.schema, + self.config, + )?) + .map(Some) + } + } + + fn size_hint(&self) -> Option<usize> { + Some(1 - usize::from(self.field_read)) + } +} + +pub struct ManyTupleDeserializer<'s, 'r, R: Read, S: Borrow<Schema>> { + reader: &'r mut R, + schema: &'s RecordSchema, + config: Config<'s, S>, + current_field: usize, +} + +impl<'s, 'r, R: Read, S: Borrow<Schema>> ManyTupleDeserializer<'s, 'r, R, S> { + pub fn new(reader: &'r mut R, schema: &'s RecordSchema, config: Config<'s, S>) -> Self { + Self { + reader, + schema, + config, + current_field: 0, + } + } +} + +impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> SeqAccess<'de> + for ManyTupleDeserializer<'s, 'r, R, S> +{ + type Error = Error; + + fn next_element_seed<T>(&mut self, seed: T) -> Result<Option<T::Value>, Self::Error> + where + T: DeserializeSeed<'de>, + { + if self.current_field < self.schema.fields.len() { + let schema = &self.schema.fields[self.current_field].schema; + self.current_field += 1; + seed.deserialize(SchemaAwareDeserializer::new( + self.reader, + schema, + self.config, + )?) + .map(Some) + } else { + Ok(None) + } + } + + fn size_hint(&self) -> Option<usize> { + todo!() Review Comment: ```suggestion Some(self.schema.fields.len().saturating_sub(self.current_field)) ``` ########## avro/src/serde/ser_schema/block.rs: ########## @@ -0,0 +1,361 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Borrow, io::Write}; + +use serde::{ + Serialize, + ser::{SerializeMap, SerializeSeq}, +}; + +use super::{Config, SchemaAwareSerializer}; +use crate::{ + Error, Schema, + encode::encode_int, + error::Details, + schema::{ArraySchema, MapSchema}, + util::zig_i32, +}; + +#[expect( + private_interfaces, + reason = "Direct and Buffered should not be used directly" +)] +pub enum BlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + Direct(DirectBlockSerializer<'s, 'w, W, S>), + Buffered(BufferedBlockSerializer<'s, 'w, W, S>), +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> BlockSerializer<'s, 'w, W, S> { + pub fn array( + writer: &'w mut W, + schema: &'s ArraySchema, + config: Config<'s, S>, + len: Option<usize>, + bytes_written: Option<usize>, + ) -> Result<Self, Error> { + let schema = if let Schema::Ref { name } = schema.items.as_ref() { + config.get_schema(name)? + } else { + &schema.items + }; + + Self::new(writer, schema, config, len, bytes_written) + } + + pub fn map( + writer: &'w mut W, + schema: &'s MapSchema, + config: Config<'s, S>, + len: Option<usize>, + bytes_written: Option<usize>, + ) -> Result<Self, Error> { + let schema = if let Schema::Ref { name } = schema.types.as_ref() { + config.get_schema(name)? + } else { + &schema.types + }; + + Self::new(writer, schema, config, len, bytes_written) + } + + fn new( + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + len: Option<usize>, + bytes_written: Option<usize>, + ) -> Result<Self, Error> { + let bytes_written = bytes_written.unwrap_or(0); + if let Some(len) = len + && config.target_block_size.is_none() + { + Ok(Self::Direct(DirectBlockSerializer::new( + writer, + schema, + config, + len, + bytes_written, + )?)) + } else { + let target_block_size = config.target_block_size.unwrap_or(1024); + Ok(Self::Buffered(BufferedBlockSerializer::new( + writer, + schema, + config, + target_block_size, + bytes_written, + ))) + } + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + match self { + BlockSerializer::Direct(direct) => direct.serialize_element(value), + BlockSerializer::Buffered(buffered) => buffered.serialize_element(value), + } + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + match self { + BlockSerializer::Direct(direct) => direct.end(), + BlockSerializer::Buffered(buffered) => buffered.end(), + } + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for BlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + match self { + BlockSerializer::Direct(direct) => direct.serialize_key(key), + BlockSerializer::Buffered(buffered) => buffered.serialize_key(key), + } + } + + fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + match self { + BlockSerializer::Direct(direct) => direct.serialize_value(value), + BlockSerializer::Buffered(buffered) => buffered.serialize_value(value), + } + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + match self { + BlockSerializer::Direct(direct) => direct.end(), + BlockSerializer::Buffered(buffered) => buffered.end(), + } + } +} + +struct DirectBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + bytes_written: usize, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> DirectBlockSerializer<'s, 'w, W, S> { + pub fn new( + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + len: usize, + mut bytes_written: usize, + ) -> Result<Self, Error> { + if len != 0 { + // .end() always writes the zero block, so we only write the size for arrays + // that have at least one element + bytes_written += zig_i32(len as i32, &mut *writer)?; + } + Ok(Self { + writer, + schema, + config, + bytes_written, + }) + } + + fn end(self) -> Result<usize, Error> { + // Write the zero directly instead of through zig_i32 which does a lot of extra work + self.writer.write_all(&[0]).map_err(Details::WriteBytes)?; + + Ok(self.bytes_written + 1) + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for DirectBlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + self.bytes_written += value.serialize(SchemaAwareSerializer::new( + self.writer, + self.schema, + self.config, + )?)?; + Ok(()) + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + self.end() + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for DirectBlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + self.bytes_written += key.serialize(SchemaAwareSerializer::new( + self.writer, + &Schema::String, + self.config, + )?)?; + Ok(()) + } + + fn serialize_value<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + self.serialize_element(value) + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + self.end() + } +} + +struct BufferedBlockSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + buffer: Vec<u8>, + schema: &'s Schema, + config: Config<'s, S>, + bytes_written: usize, + items_in_buffer: usize, + target_block_size: usize, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> BufferedBlockSerializer<'s, 'w, W, S> { + pub fn new( + writer: &'w mut W, + schema: &'s Schema, + config: Config<'s, S>, + target_block_size: usize, + bytes_written: usize, + ) -> Self { + Self { + writer, + buffer: Vec::with_capacity(target_block_size), + schema, + config, + bytes_written, + items_in_buffer: 0, + target_block_size, + } + } + + /// Write a block including the items and size header. + fn write_block(&mut self) -> Result<(), Error> { + // Write the header, the negative item count indicates that the next value is the size of the + // block in bytes + self.bytes_written += encode_int(0 - (self.items_in_buffer as i32), &mut *self.writer)?; + self.bytes_written += encode_int(self.buffer.len() as i32, &mut *self.writer)?; + + // Write the actual data + self.writer + .write_all(&self.buffer) + .map_err(Details::WriteBytes)?; + self.bytes_written += self.buffer.len(); + + // Reset the buffer + self.items_in_buffer = 0; + self.buffer.clear(); + + Ok(()) + } + + fn end(mut self) -> Result<usize, Error> { + // Write any items remaining in the buffer + if self.items_in_buffer > 0 { + self.write_block()?; + } + debug_assert_eq!(self.buffer.len(), 0, "Buffer must be empty at this point"); + + // Write the zero directly instead of through zig_i32 which does a lot of extra work + self.writer.write_all(&[0]).map_err(Details::WriteBytes)?; + + Ok(self.bytes_written + 1) + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeSeq for BufferedBlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_element<T>(&mut self, value: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + value.serialize(SchemaAwareSerializer::new( + self.writer, + self.schema, + self.config, + )?)?; + self.items_in_buffer += 1; + + if self.buffer.len() >= self.target_block_size { + self.write_block()?; + } + Ok(()) + } + + fn end(self) -> Result<Self::Ok, Self::Error> { + self.end() + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> SerializeMap for BufferedBlockSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + + fn serialize_key<T>(&mut self, key: &T) -> Result<(), Self::Error> + where + T: ?Sized + Serialize, + { + key.serialize(SchemaAwareSerializer::new( + self.writer, Review Comment: Shouldn't this use the buffer ? ```suggestion &mut self.buffer, ``` ########## avro/src/error.rs: ########## @@ -156,7 +157,6 @@ pub enum Details { #[error("Union index {index} out of bounds: {num_variants}")] GetUnionVariant { index: i64, num_variants: usize }, - #[deprecated(since = "0.20.0", note = "This error variant is not generated anymore")] #[error("Enum symbol index out of bounds: {num_variants}")] Review Comment: let's add `index` to the error message ########## avro/src/serde/ser_schema/record/field_default.rs: ########## @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Serialize, Serializer, ser::Error}; +use serde_json::Value; + +use crate::{ + Schema, + schema::{SchemaKind, UnionSchema, UuidSchema}, + serde::ser_schema::SERIALIZING_SCHEMA_DEFAULT, +}; + +pub struct SchemaAwareRecordFieldDefault<'v, 's> { + value: &'v Value, + schema: &'s Schema, +} + +impl<'v, 's> SchemaAwareRecordFieldDefault<'v, 's> { + pub fn new(value: &'v Value, schema: &'s Schema) -> Self { + SchemaAwareRecordFieldDefault { value, schema } + } + + fn serialize_as_newtype_variant<S: Serializer>( + &self, + serializer: S, + index: usize, + union: &'s UnionSchema, + ) -> Result<S::Ok, S::Error> { + let value = Self::new(self.value, &union.variants()[index]); + serializer.serialize_newtype_variant( + SERIALIZING_SCHEMA_DEFAULT, + index as u32, + SERIALIZING_SCHEMA_DEFAULT, + &value, + ) + } + + fn recursive_type_check(value: &Value, schema: &'s Schema) -> bool { + match (value, schema) { + (Value::Null, Schema::Null) + | (Value::Bool(_), Schema::Boolean) + | (Value::String(_), Schema::Bytes | Schema::String) => true, + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + i32::try_from(long).is_ok() + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => true, + (Value::Number(n), Schema::Float | Schema::Double) if n.is_f64() => true, + (Value::String(s), Schema::Fixed(fixed)) => s.len() == fixed.size, + (Value::String(s), Schema::Enum(enum_schema)) => enum_schema.symbols.contains(s), + (Value::Object(o), Schema::Record(record)) => record.fields.iter().all(|field| { + if let Some(value) = o.get(&field.name) { + Self::recursive_type_check(value, &field.schema) + } else { + field.default.is_some() + } + }), + (Value::Object(o), Schema::Map(map)) => o + .values() + .all(|value| Self::recursive_type_check(value, &map.types)), + (Value::Array(a), Schema::Array(array)) => a + .iter() + .all(|value| Self::recursive_type_check(value, &array.items)), + (_, Schema::Union(union)) => union + .variants() + .iter() + .any(|variant| Self::recursive_type_check(value, variant)), + _ => false, + } + } +} + +impl<'v, 's> Serialize for SchemaAwareRecordFieldDefault<'v, 's> { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + match (&self.value, self.schema) { + (Value::Null, Schema::Null) => serializer.serialize_unit(), + (Value::Bool(boolean), Schema::Boolean) => serializer.serialize_bool(*boolean), + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + let int = i32::try_from(long).map_err(|_| { + S::Error::custom(format!("Default {long} is too large for {:?}", self.schema)) + })?; + serializer.serialize_i32(int) + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => { + let long = n.as_i64().unwrap(); + serializer.serialize_i64(long) + } + (Value::Number(n), Schema::Float) if n.is_f64() => { Review Comment: ```suggestion (Value::Number(n), Schema::Float) if n.as_f64().is_some() => { ``` ########## avro/src/serde/deser_schema/map.rs: ########## @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Borrow, io::Read}; + +use serde::de::{DeserializeSeed, MapAccess}; + +use crate::{ + Error, Schema, + schema::MapSchema, + serde::deser_schema::{Config, SchemaAwareDeserializer}, + util::{zag_i32, zag_i64}, +}; + +pub struct MapDeserializer<'s, 'r, R: Read, S: Borrow<Schema>> { + reader: &'r mut R, + schema: &'s Schema, + config: Config<'s, S>, + remaining: Option<u32>, +} + +impl<'s, 'r, R: Read, S: Borrow<Schema>> MapDeserializer<'s, 'r, R, S> { + pub fn new( + reader: &'r mut R, + schema: &'s MapSchema, + config: Config<'s, S>, + ) -> Result<Self, Error> { + let schema = if let Schema::Ref { name } = schema.types.as_ref() { + config.get_schema(name)? + } else { + &schema.types + }; + let remaining = Self::read_block_header(reader)?; + Ok(Self { + schema, + reader, + config, + remaining, + }) + } + + fn read_block_header(reader: &mut R) -> Result<Option<u32>, Error> { + let remaining = zag_i32(reader)?; + if remaining < 0 { + // If the block size is negative the next number is the size of the block in bytes + let _bytes = zag_i64(reader)?; + } + if remaining == 0 { + // If the block size is zero the array is finished Review Comment: ```suggestion // If the block size is zero the map is finished ``` ########## avro/src/writer/mod.rs: ########## @@ -67,6 +70,19 @@ impl<'a, W: Write> Writer<'a, W> { #[builder(default = false)] has_header: bool, #[builder(default)] user_metadata: HashMap<String, Value>, + /// Should [`Serialize`] implementations pick a human readable represenation. Review Comment: ```suggestion /// Should [`Serialize`] implementations pick a human readable representation. ``` ########## avro/src/reader/block.rs: ########## @@ -205,6 +215,43 @@ impl<'r, R: Read> Block<'r, R> { Ok(Some(item)) } + pub(super) fn read_next_deser<T: DeserializeOwned>( + &mut self, + read_schema: Option<&Schema>, + ) -> AvroResult<Option<T>> { + if self.is_empty() { + self.read_block_next()?; + if self.is_empty() { + return Ok(None); + } + } + + let mut block_bytes = &self.buf[self.buf_idx..]; + let b_original = block_bytes.len(); + + let item = if read_schema.is_some() { + todo!("Schema aware deserialisation does not resolve schemas yet"); + } else { + let config = Config { + names: &self.names_refs, + human_readable: self.human_readable, + }; + T::deserialize(SchemaAwareDeserializer::new( + &mut block_bytes, + &self.writer_schema, + config, + )?)? + }; + + if b_original != 0 && b_original == block_bytes.len() { + // No bytes were read, return an error to avoid an infinite loop + return Err(Details::ReadBlock.into()); Review Comment: Let's add some more context information to this error. As a user I won't be sure what to do. ########## avro/src/serde/ser_schema/record/field_default.rs: ########## @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Serialize, Serializer, ser::Error}; +use serde_json::Value; + +use crate::{ + Schema, + schema::{SchemaKind, UnionSchema, UuidSchema}, + serde::ser_schema::SERIALIZING_SCHEMA_DEFAULT, +}; + +pub struct SchemaAwareRecordFieldDefault<'v, 's> { + value: &'v Value, + schema: &'s Schema, +} + +impl<'v, 's> SchemaAwareRecordFieldDefault<'v, 's> { + pub fn new(value: &'v Value, schema: &'s Schema) -> Self { + SchemaAwareRecordFieldDefault { value, schema } + } + + fn serialize_as_newtype_variant<S: Serializer>( + &self, + serializer: S, + index: usize, + union: &'s UnionSchema, + ) -> Result<S::Ok, S::Error> { + let value = Self::new(self.value, &union.variants()[index]); + serializer.serialize_newtype_variant( + SERIALIZING_SCHEMA_DEFAULT, + index as u32, + SERIALIZING_SCHEMA_DEFAULT, + &value, + ) + } + + fn recursive_type_check(value: &Value, schema: &'s Schema) -> bool { + match (value, schema) { + (Value::Null, Schema::Null) + | (Value::Bool(_), Schema::Boolean) + | (Value::String(_), Schema::Bytes | Schema::String) => true, + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + i32::try_from(long).is_ok() + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => true, + (Value::Number(n), Schema::Float | Schema::Double) if n.is_f64() => true, + (Value::String(s), Schema::Fixed(fixed)) => s.len() == fixed.size, + (Value::String(s), Schema::Enum(enum_schema)) => enum_schema.symbols.contains(s), + (Value::Object(o), Schema::Record(record)) => record.fields.iter().all(|field| { + if let Some(value) = o.get(&field.name) { + Self::recursive_type_check(value, &field.schema) + } else { + field.default.is_some() + } + }), + (Value::Object(o), Schema::Map(map)) => o + .values() + .all(|value| Self::recursive_type_check(value, &map.types)), + (Value::Array(a), Schema::Array(array)) => a + .iter() + .all(|value| Self::recursive_type_check(value, &array.items)), + (_, Schema::Union(union)) => union + .variants() + .iter() + .any(|variant| Self::recursive_type_check(value, variant)), + _ => false, Review Comment: What about the logical types with inner type, e.g. Uuid, Decimal & Duration) ? ########## avro/src/serde/deser_schema/mod.rs: ########## @@ -0,0 +1,1688 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Borrow, collections::HashMap, io::Read}; + +use serde::de::{Deserializer, Visitor}; + +use crate::{ + Error, Schema, + decode::decode_len, + error::Details, + schema::{DecimalSchema, InnerDecimalSchema, Name, UnionSchema, UuidSchema}, + util::{zag_i32, zag_i64}, +}; + +mod array; +mod enums; +mod identifier; +mod map; +mod record; +mod tuple; + +use array::ArrayDeserializer; +use enums::PlainEnumDeserializer; +use map::MapDeserializer; +use record::RecordDeserializer; +use tuple::{ManyTupleDeserializer, OneTupleDeserializer}; + +use crate::serde::deser_schema::enums::UnionEnumDeserializer; + +/// Configure the deserializer. +#[derive(Debug)] +pub struct Config<'s, S: Borrow<Schema>> { + /// Any references in the schema will be resolved using this map. + /// + /// This map is not allowed to contain any [`Schema::Ref`], the deserializer is allowed to panic + /// in that case. + pub names: &'s HashMap<Name, S>, + /// Was the data serialized with `human_readable`. + pub human_readable: bool, +} + +impl<'s, S: Borrow<Schema>> Config<'s, S> { + /// Get the schema for this name. + fn get_schema(&self, name: &Name) -> Result<&'s Schema, Error> { + self.names + .get(name) + .map(Borrow::borrow) + .ok_or_else(|| Details::SchemaResolutionError(name.clone()).into()) + } +} + +// This needs to be implemented manually as the derive puts a bound on `S` +// which is unnecessary as a reference is always Copy. +impl<'s, S: Borrow<Schema>> Copy for Config<'s, S> {} +impl<'s, S: Borrow<Schema>> Clone for Config<'s, S> { + fn clone(&self) -> Self { + *self + } +} + +/// A deserializer that deserializes directly from raw Avro datum. +pub struct SchemaAwareDeserializer<'s, 'r, R: Read, S: Borrow<Schema>> { + reader: &'r mut R, + /// The schema of the data being deserialized. + /// + /// This schema is guaranteed to not be a [`Schema::Ref`]. + schema: &'s Schema, + config: Config<'s, S>, +} + +impl<'s, 'r, R: Read, S: Borrow<Schema>> SchemaAwareDeserializer<'s, 'r, R, S> { + /// Create a new deserializer for this schema. + /// + /// This will resolve a [`Schema::Ref`] to its actual schema. + pub fn new( + reader: &'r mut R, + schema: &'s Schema, + config: Config<'s, S>, + ) -> Result<Self, Error> { + if let Schema::Ref { name } = schema { + let schema = config.get_schema(name)?; + Ok(Self { + reader, + schema, + config, + }) + } else { + Ok(Self { + reader, + schema, + config, + }) + } + } + + /// Create an error for the current type being deserialized with the given message. + /// + /// This will also include the current schema. + fn error(&self, ty: &'static str, error: impl Into<String>) -> Error { + Error::new(Details::DeserializeSchemaAware { + value_type: ty, + value: error.into(), + schema: self.schema.clone(), + }) + } + + /// Create a new deserializer with the existing reader and config. + /// + /// This will resolve the schema if it is a reference. + fn with_different_schema(mut self, schema: &'s Schema) -> Result<Self, Error> { + self.schema = if let Schema::Ref { name } = schema { + self.config.get_schema(name)? + } else { + schema + }; + Ok(self) + } + + /// Read the union and create a new deserializer with the existing reader and config. + /// + /// This will resolve the read schema if it is a reference. + fn with_union(self, schema: &'s UnionSchema) -> Result<Self, Error> { + let index = zag_i32(self.reader)?; + let variant = schema.get_variant(index as usize)?; + self.with_different_schema(variant) + } + + /// Read an integer from the reader. + /// + /// This will check that the current schema is [`Schema::Int`] or a logical type based on that. + /// It does not read [`Schema::Union`]s. + fn checked_read_int(&mut self, original_ty: &'static str) -> Result<i32, Error> { + match self.schema { + Schema::Int | Schema::Date | Schema::TimeMillis => zag_i32(self.reader), + _ => Err(self.error( + original_ty, + "Expected Schema::Int | Schema::Date | Schema::TimeMillis", + )), + } + } + + /// Read a long from the reader. + /// + /// This will check that the current schema is [`Schema::Long`] or a logical type based on that. + /// It does not read [`Schema::Union`]s. + fn checked_read_long(&mut self, original_ty: &'static str) -> Result<i64, Error> { + match self.schema { + Schema::Long | Schema::TimeMicros | Schema::TimestampMillis | Schema::TimestampMicros + | Schema::TimestampNanos | Schema::LocalTimestampMillis | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos => zag_i64(self.reader), + _ => Err(self.error( + original_ty, + "Expected Schema::Long | Schema::TimeMicros | Schema::{,Local}Timestamp{Millis,Micros,Nanos}", + )), + + } + } + + /// Read a string from the reader. + /// + /// This does not check the current schema. + fn read_string(&mut self) -> Result<String, Error> { + let bytes = self.read_bytes_with_len()?; + Ok(String::from_utf8(bytes).map_err(Details::ConvertToUtf8)?) + } + + /// Read a bytes from the reader. + /// + /// This does not check the current schema. + fn read_bytes_with_len(&mut self) -> Result<Vec<u8>, Error> { + let length = decode_len(self.reader)?; + self.read_bytes(length) + } + + /// Read `n` bytes from the reader. + /// + /// This does not check the current schema. + fn read_bytes(&mut self, length: usize) -> Result<Vec<u8>, Error> { + let mut buf = vec![0; length]; + self.reader + .read_exact(&mut buf) + .map_err(Details::ReadBytes)?; + Ok(buf) + } + + /// Read `n` bytes from the reader. + /// + /// This does not check the current schema. + fn read_array<const N: usize>(&mut self) -> Result<[u8; N], Error> { + let mut buf = [0; N]; + self.reader + .read_exact(&mut buf) + .map_err(Details::ReadBytes)?; + Ok(buf) + } +} + +/// A static string that will bypass name checks in `deserialize_*` functions. +/// +/// This is used so that `deserialize_any` can use the `deserialize_*` implementation which take +/// a static string. +/// +/// We don't want users to abuse this feature so this value is compared by pointer address, therefore +/// a user providing the string below will not be able to skip name validation. +static DESERIALIZE_ANY: &str = "This value is compared by pointer value"; +/// A static array so that `deserialize_any` can call `deserialize_*` functions. +static DESERIALIZE_ANY_FIELDS: &[&str] = &[]; + +impl<'de, 's, 'r, R: Read, S: Borrow<Schema>> Deserializer<'de> + for SchemaAwareDeserializer<'s, 'r, R, S> +{ + type Error = Error; + + fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.schema { + Schema::Null => self.deserialize_unit(visitor), + Schema::Boolean => self.deserialize_bool(visitor), + Schema::Int | Schema::Date | Schema::TimeMillis => self.deserialize_i32(visitor), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos => self.deserialize_i64(visitor), + Schema::Float => self.deserialize_f32(visitor), + Schema::Double => self.deserialize_f64(visitor), + Schema::Bytes + | Schema::Fixed(_) + | Schema::Decimal(_) + | Schema::BigDecimal + | Schema::Uuid(UuidSchema::Fixed(_) | UuidSchema::Bytes) + | Schema::Duration(_) => self.deserialize_byte_buf(visitor), + Schema::String | Schema::Uuid(UuidSchema::String) => self.deserialize_string(visitor), + Schema::Array(_) => self.deserialize_seq(visitor), + Schema::Map(_) => self.deserialize_map(visitor), + Schema::Union(union) => self.with_union(union)?.deserialize_any(visitor), + Schema::Record(schema) => { + if schema.attributes.get("org.apache.avro.rust.tuple") + == Some(&serde_json::Value::Bool(true)) + { + // This attribute is needed because we can't tell the difference between a tuple + // and struct, but a tuple needs to be deserialized as a sequence instead of a map. + self.deserialize_tuple(schema.fields.len(), visitor) + } else { + self.deserialize_struct(DESERIALIZE_ANY, DESERIALIZE_ANY_FIELDS, visitor) + } + } + Schema::Enum(_) => { + self.deserialize_enum(DESERIALIZE_ANY, DESERIALIZE_ANY_FIELDS, visitor) + } + Schema::Ref { .. } => unreachable!("References are resolved on deserializer creation"), + } + } + + fn deserialize_bool<V>(self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.schema { + Schema::Boolean => { + let mut buf = [0xFF]; + self.reader + .read_exact(&mut buf) + .map_err(Details::ReadBytes)?; + match buf[0] { + 0 => visitor.visit_bool(false), + 1 => visitor.visit_bool(true), + _ => Err(self.error("bool", format!("{} is not a valid boolean", buf[0]))), + } + } + Schema::Union(union) => self.with_union(union)?.deserialize_bool(visitor), + _ => Err(self.error("bool", "Expected Schema::Boolean")), + } + } + + fn deserialize_i8<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if let Schema::Union(union) = self.schema { + self.with_union(union)?.deserialize_i8(visitor) + } else { + let int = self.checked_read_int("i8")?; + let value = i8::try_from(int) + .map_err(|_| self.error("i8", format!("Could not convert int ({int}) to an i8")))?; + visitor.visit_i8(value) + } + } + + fn deserialize_i16<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if let Schema::Union(union) = self.schema { + self.with_union(union)?.deserialize_i16(visitor) + } else { + let int = self.checked_read_int("i16")?; + let value = i16::try_from(int).map_err(|_| { + self.error("i16", format!("Could not convert int ({int}) to an i16")) + })?; + visitor.visit_i16(value) + } + } + + fn deserialize_i32<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if let Schema::Union(union) = self.schema { + self.with_union(union)?.deserialize_i32(visitor) + } else { + visitor.visit_i32(self.checked_read_int("i32")?) + } + } + + fn deserialize_i64<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if let Schema::Union(union) = self.schema { + self.with_union(union)?.deserialize_i64(visitor) + } else { + visitor.visit_i64(self.checked_read_long("i64")?) + } + } + + fn deserialize_i128<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.schema { + Schema::Fixed(fixed) if fixed.size == 16 && fixed.name.name() == "i128" => { + visitor.visit_i128(i128::from_le_bytes(self.read_array()?)) + } + Schema::Union(union) => self.with_union(union)?.deserialize_i128(visitor), + _ => Err(self.error("i128", r#"Expected Schema::Fixed(name: "i128", size: 16)"#)), + } + } + + fn deserialize_u8<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if let Schema::Union(union) = self.schema { + self.with_union(union)?.deserialize_u8(visitor) + } else { + let int = self.checked_read_int("u8")?; + let value = u8::try_from(int) + .map_err(|_| self.error("u8", format!("Could not convert int ({int}) to an u8")))?; + visitor.visit_u8(value) + } + } + + fn deserialize_u16<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if let Schema::Union(union) = self.schema { + self.with_union(union)?.deserialize_u16(visitor) + } else { + let int = self.checked_read_int("u16")?; + let value = u16::try_from(int).map_err(|_| { + self.error("u16", format!("Could not convert int ({int}) to an u16")) + })?; + visitor.visit_u16(value) + } + } + + fn deserialize_u32<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + if let Schema::Union(union) = self.schema { + self.with_union(union)?.deserialize_u32(visitor) + } else { + let long = self.checked_read_long("u32")?; + let value = u32::try_from(long).map_err(|_| { + self.error("u32", format!("Could not convert long ({long}) to an u32")) + })?; + visitor.visit_u32(value) + } + } + + fn deserialize_u64<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.schema { + Schema::Fixed(fixed) if fixed.size == 8 && fixed.name.name() == "u64" => { + visitor.visit_u64(u64::from_le_bytes(self.read_array()?)) + } + Schema::Union(union) => self.with_union(union)?.deserialize_u64(visitor), + _ => Err(self.error("u64", r#"Expected Schema::Fixed(name: "u64", size: 8)"#)), + } + } + + fn deserialize_u128<V>(mut self, visitor: V) -> Result<V::Value, Self::Error> + where + V: Visitor<'de>, + { + match self.schema { + Schema::Fixed(fixed) if fixed.size == 16 && fixed.name.name() == "u128" => { + visitor.visit_u128(u128::from_le_bytes(self.read_array()?)) + } + Schema::Union(union) => self.with_union(union)?.deserialize_i128(visitor), Review Comment: ```suggestion Schema::Union(union) => self.with_union(union)?.deserialize_u128(visitor), ``` copy/paste error ? ########## avro/src/serde/ser_schema/record/mod.rs: ########## @@ -0,0 +1,270 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +mod field_default; + +use std::{borrow::Borrow, cmp::Ordering, collections::HashMap, io::Write}; + +use serde::{ + Serialize, + ser::{SerializeMap, SerializeStruct, SerializeStructVariant}, +}; + +use super::{Config, SchemaAwareSerializer}; +use crate::{ + Error, Schema, + error::Details, + schema::RecordSchema, + serde::{ + ser_schema::record::field_default::SchemaAwareRecordFieldDefault, util::StringSerializer, + }, +}; + +pub struct RecordSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + record: &'s RecordSchema, + config: Config<'s, S>, + /// Cache fields received out-of-order + cache: HashMap<usize, Vec<u8>>, + /// The position of the current map entry being written + map_position: Option<usize>, + /// The field that should be written now. + field_position: usize, + bytes_written: usize, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> RecordSerializer<'s, 'w, W, S> { + pub fn new( + writer: &'w mut W, + record: &'s RecordSchema, + config: Config<'s, S>, + bytes_written: Option<usize>, + ) -> Self { + Self { + writer, + record, + config, + cache: HashMap::new(), + map_position: None, + field_position: 0, + bytes_written: bytes_written.unwrap_or(0), + } + } + + fn field_error(&self, position: usize, error: Error) -> Error { + let field = &self.record.fields[position]; + let error = match error.into_details() { + Details::SerializeValueWithSchema { + value_type, + value, + schema: _, + } => format!("Failed to serialize value of type `{value_type}`: {value}"), + Details::SerializeRecordFieldWithSchema { + field_name, + record_schema, + error, + } => format!( + "Failed to serialize field '{field_name}' of record {}: {error}", + record_schema.name + ), + Details::MissingDefaultForSkippedField { field_name, schema } => { + format!( + "Missing default for skipped field '{field_name}' for record {}", + schema.name + ) + } + details => format!("{details:?}"), + }; + Error::new(Details::SerializeRecordFieldWithSchema { + field_name: field.name.clone(), + record_schema: self.record.clone(), + error, + }) + } + + fn serialize_next_field<T: ?Sized + Serialize>( + &mut self, + position: usize, + value: &T, + ) -> Result<(), Error> { + let field = &self.record.fields[position]; + match self.field_position.cmp(&position) { + Ordering::Equal => { + // Field received in the right order + self.bytes_written += value + .serialize(SchemaAwareSerializer::new( + self.writer, + &field.schema, + self.config, + )?) + .map_err(|e| self.field_error(self.field_position, e))?; + self.field_position += 1; + + // Write any fields that were already received and can now be written + while let Some(bytes) = self.cache.remove(&self.field_position) { + self.writer.write_all(&bytes).map_err(Details::WriteBytes)?; + self.bytes_written += bytes.len(); + self.field_position += 1; + } + + Ok(()) + } + Ordering::Less => { + // Another field needs to be written first, so cache this field + let mut bytes = Vec::new(); + value + .serialize(SchemaAwareSerializer::new( + &mut bytes, + &field.schema, + self.config, + )?) + .map_err(|e| self.field_error(self.field_position, e))?; Review Comment: ```suggestion .map_err(|e| self.field_error(position, e))?; ``` Use the requested position for the error, not the other field's position ########## avro/src/serde/ser_schema/union.rs: ########## @@ -0,0 +1,566 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::{borrow::Borrow, io::Write}; + +use serde::{Serialize, Serializer}; + +use super::{Config, MapOrRecordSerializer, SchemaAwareSerializer}; +use crate::{ + Error, Schema, + error::Details, + schema::{FixedSchema, SchemaKind, UnionSchema}, + serde::{ + ser_schema::{ + block::BlockSerializer, + record::RecordSerializer, + tuple::{ManyTupleSerializer, TupleSerializer}, + }, + with::{BytesType, SER_BYTES_TYPE}, + }, + util::{zig_i32, zig_i64}, +}; + +pub struct UnionSerializer<'s, 'w, W: Write, S: Borrow<Schema>> { + writer: &'w mut W, + union: &'s UnionSchema, + config: Config<'s, S>, +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> UnionSerializer<'s, 'w, W, S> { + pub fn new(writer: &'w mut W, union: &'s UnionSchema, config: Config<'s, S>) -> Self { + UnionSerializer { + writer, + union, + config, + } + } + + fn error(&self, ty: &'static str, error: impl Into<String>) -> Error { + Error::new(Details::SerializeValueWithSchema { + value_type: ty, + value: error.into(), + schema: Schema::Union(self.union.clone()), + }) + } + + /// Write an integer to the writer. + /// + /// This will check that the current schema is [`Schema::Int`] or a logical type based on that. + /// This will write the union index. + pub(super) fn checked_write_int( + &mut self, + original_ty: &'static str, + v: i32, + ) -> Result<usize, Error> { + if let Some(index) = self.union.index_of_schema_kind(SchemaKind::Int) { + let mut bytes_written = zig_i32(index as i32, &mut *self.writer)?; + bytes_written += zig_i32(v, &mut *self.writer)?; + Ok(bytes_written) + } else { + Err(self.error( + original_ty, + "Expected Schema::Int | Schema::Date | Schema::TimeMillis in variants", + ))? + } + } + + /// Write a long to the writer. + /// + /// This will check that the current schema is [`Schema::Long`] or a logical type based on that. + /// This will write the union index. + pub(super) fn checked_write_long( + &mut self, + original_ty: &'static str, + v: i64, + ) -> Result<usize, Error> { + if let Some(index) = self.union.index_of_schema_kind(SchemaKind::Long) { + let mut bytes_written = zig_i32(index as i32, &mut *self.writer)?; + bytes_written += zig_i64(v, &mut *self.writer)?; + Ok(bytes_written) + } else { + Err(self.error(original_ty, "Expected Schema::Long | Schema::TimeMicros | Schema::{,Local}Timestamp{Millis,Micros,Nanos} in variants")) + } + } + + /// Write bytes to the writer with preceding length header. + /// + /// This does not check the current schema and does not write the union index. + fn write_bytes_with_len(&mut self, bytes: &[u8]) -> Result<usize, Error> { + let mut bytes_written = 0; + bytes_written += zig_i64(bytes.len() as i64, &mut *self.writer)?; + bytes_written += self.write_bytes(bytes)?; + Ok(bytes_written) + } + + /// Write bytes to the writer. + /// + /// This does not check the current schema and does not write the union index. + fn write_bytes(&mut self, bytes: &[u8]) -> Result<usize, Error> { + self.writer.write_all(bytes).map_err(Details::WriteBytes)?; + Ok(bytes.len()) + } + + /// Write an array of `n` bytes to the writer. + /// + /// This does not check the current schema and does not write the union index. + fn write_array<const N: usize>(&mut self, bytes: [u8; N]) -> Result<usize, Error> { + self.write_bytes(&bytes)?; + Ok(N) + } +} + +impl<'s, 'w, W: Write, S: Borrow<Schema>> Serializer for UnionSerializer<'s, 'w, W, S> { + type Ok = usize; + type Error = Error; + type SerializeSeq = BlockSerializer<'s, 'w, W, S>; + type SerializeTuple = TupleSerializer<'s, 'w, W, S>; + type SerializeTupleStruct = ManyTupleSerializer<'s, 'w, W, S>; + type SerializeTupleVariant = ManyTupleSerializer<'s, 'w, W, S>; + type SerializeMap = MapOrRecordSerializer<'s, 'w, W, S>; + type SerializeStruct = RecordSerializer<'s, 'w, W, S>; + type SerializeStructVariant = RecordSerializer<'s, 'w, W, S>; + + fn serialize_bool(mut self, v: bool) -> Result<Self::Ok, Self::Error> { + if let Some(index) = self.union.index_of_schema_kind(SchemaKind::Boolean) { + let mut bytes_written = zig_i32(index as i32, &mut *self.writer)?; + bytes_written += self.write_array([u8::from(v)])?; + Ok(bytes_written) + } else { + Err(self.error("bool", "Expected Schema::Boolean in variants"))? + } + } + + fn serialize_i8(mut self, v: i8) -> Result<Self::Ok, Self::Error> { + self.checked_write_int("i8", i32::from(v)) + } + + fn serialize_i16(mut self, v: i16) -> Result<Self::Ok, Self::Error> { + self.checked_write_int("i16", i32::from(v)) + } + + fn serialize_i32(mut self, v: i32) -> Result<Self::Ok, Self::Error> { + self.checked_write_int("i32", v) + } + + fn serialize_i64(mut self, v: i64) -> Result<Self::Ok, Self::Error> { + self.checked_write_long("i64", v) + } + + fn serialize_i128(mut self, v: i128) -> Result<Self::Ok, Self::Error> { + match self.union.find_named_schema("i128", self.config.names)? { + Some((index, Schema::Fixed(FixedSchema { size: 16, .. }))) => { + let mut bytes_written = zig_i32(index as i32, &mut *self.writer)?; + bytes_written += self.write_array(v.to_le_bytes())?; + Ok(bytes_written) + } + _ => Err(self.error( + "i128", + r#"Expected Schema::Fixed(name: "i128", size: 16) in variants"#, + )), + } + } + + fn serialize_u8(mut self, v: u8) -> Result<Self::Ok, Self::Error> { + self.checked_write_int("u8", i32::from(v)) + } + + fn serialize_u16(mut self, v: u16) -> Result<Self::Ok, Self::Error> { + self.checked_write_int("u16", i32::from(v)) + } + + fn serialize_u32(mut self, v: u32) -> Result<Self::Ok, Self::Error> { + self.checked_write_long("u32", i64::from(v)) + } + + fn serialize_u64(mut self, v: u64) -> Result<Self::Ok, Self::Error> { + match self.union.find_named_schema("u64", self.config.names)? { + Some((index, Schema::Fixed(FixedSchema { size: 16, .. }))) => { Review Comment: ```suggestion Some((index, Schema::Fixed(FixedSchema { size: 8, .. }))) => { ``` ########## avro/src/serde/ser_schema/record/field_default.rs: ########## @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Serialize, Serializer, ser::Error}; +use serde_json::Value; + +use crate::{ + Schema, + schema::{SchemaKind, UnionSchema, UuidSchema}, + serde::ser_schema::SERIALIZING_SCHEMA_DEFAULT, +}; + +pub struct SchemaAwareRecordFieldDefault<'v, 's> { + value: &'v Value, + schema: &'s Schema, +} + +impl<'v, 's> SchemaAwareRecordFieldDefault<'v, 's> { + pub fn new(value: &'v Value, schema: &'s Schema) -> Self { + SchemaAwareRecordFieldDefault { value, schema } + } + + fn serialize_as_newtype_variant<S: Serializer>( + &self, + serializer: S, + index: usize, + union: &'s UnionSchema, + ) -> Result<S::Ok, S::Error> { + let value = Self::new(self.value, &union.variants()[index]); + serializer.serialize_newtype_variant( + SERIALIZING_SCHEMA_DEFAULT, + index as u32, + SERIALIZING_SCHEMA_DEFAULT, + &value, + ) + } + + fn recursive_type_check(value: &Value, schema: &'s Schema) -> bool { + match (value, schema) { + (Value::Null, Schema::Null) + | (Value::Bool(_), Schema::Boolean) + | (Value::String(_), Schema::Bytes | Schema::String) => true, + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + i32::try_from(long).is_ok() + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => true, + (Value::Number(n), Schema::Float | Schema::Double) if n.is_f64() => true, + (Value::String(s), Schema::Fixed(fixed)) => s.len() == fixed.size, + (Value::String(s), Schema::Enum(enum_schema)) => enum_schema.symbols.contains(s), + (Value::Object(o), Schema::Record(record)) => record.fields.iter().all(|field| { + if let Some(value) = o.get(&field.name) { + Self::recursive_type_check(value, &field.schema) + } else { + field.default.is_some() + } + }), + (Value::Object(o), Schema::Map(map)) => o + .values() + .all(|value| Self::recursive_type_check(value, &map.types)), + (Value::Array(a), Schema::Array(array)) => a + .iter() + .all(|value| Self::recursive_type_check(value, &array.items)), + (_, Schema::Union(union)) => union + .variants() + .iter() + .any(|variant| Self::recursive_type_check(value, variant)), + _ => false, + } + } +} + +impl<'v, 's> Serialize for SchemaAwareRecordFieldDefault<'v, 's> { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + match (&self.value, self.schema) { + (Value::Null, Schema::Null) => serializer.serialize_unit(), + (Value::Bool(boolean), Schema::Boolean) => serializer.serialize_bool(*boolean), + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + let int = i32::try_from(long).map_err(|_| { + S::Error::custom(format!("Default {long} is too large for {:?}", self.schema)) + })?; + serializer.serialize_i32(int) + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => { + let long = n.as_i64().unwrap(); + serializer.serialize_i64(long) + } + (Value::Number(n), Schema::Float) if n.is_f64() => { + let float = n.as_f64().unwrap(); + serializer.serialize_f32(float as f32) + } + (Value::Number(n), Schema::Double) if n.is_f64() => { Review Comment: ```suggestion (Value::Number(n), Schema::Double) if n.as_f64().is_some() => { ``` ########## avro/src/writer/datum.rs: ########## @@ -59,6 +62,19 @@ impl<'s> GenericDatumWriter<'s> { /// written data unreadable. #[builder(default = true)] validate: bool, + /// At what block size to start a new block (for arrays and maps). + /// + /// This is a minimum value, the block size will always be larger than this except for the last + /// block. + /// + /// When set to `None` all values will be written in a single block. This can be faster as no + /// intermediate buffer is used, but seeking through written data will be slower. + target_block_size: Option<usize>, + /// Should [`Serialize`] implementations pick a human readable represenation. Review Comment: ```suggestion /// Should [`Serialize`] implementations pick a human readable representation. ``` ########## avro/src/serde/ser_schema/record/field_default.rs: ########## @@ -0,0 +1,219 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Serialize, Serializer, ser::Error}; +use serde_json::Value; + +use crate::{ + Schema, + schema::{SchemaKind, UnionSchema, UuidSchema}, + serde::ser_schema::SERIALIZING_SCHEMA_DEFAULT, +}; + +pub struct SchemaAwareRecordFieldDefault<'v, 's> { + value: &'v Value, + schema: &'s Schema, +} + +impl<'v, 's> SchemaAwareRecordFieldDefault<'v, 's> { + pub fn new(value: &'v Value, schema: &'s Schema) -> Self { + SchemaAwareRecordFieldDefault { value, schema } + } + + fn serialize_as_newtype_variant<S: Serializer>( + &self, + serializer: S, + index: usize, + union: &'s UnionSchema, + ) -> Result<S::Ok, S::Error> { + let value = Self::new(self.value, &union.variants()[index]); + serializer.serialize_newtype_variant( + SERIALIZING_SCHEMA_DEFAULT, + index as u32, + SERIALIZING_SCHEMA_DEFAULT, + &value, + ) + } + + fn recursive_type_check(value: &Value, schema: &'s Schema) -> bool { + match (value, schema) { + (Value::Null, Schema::Null) + | (Value::Bool(_), Schema::Boolean) + | (Value::String(_), Schema::Bytes | Schema::String) => true, + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + i32::try_from(long).is_ok() + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => true, + (Value::Number(n), Schema::Float | Schema::Double) if n.is_f64() => true, + (Value::String(s), Schema::Fixed(fixed)) => s.len() == fixed.size, + (Value::String(s), Schema::Enum(enum_schema)) => enum_schema.symbols.contains(s), + (Value::Object(o), Schema::Record(record)) => record.fields.iter().all(|field| { + if let Some(value) = o.get(&field.name) { + Self::recursive_type_check(value, &field.schema) + } else { + field.default.is_some() + } + }), + (Value::Object(o), Schema::Map(map)) => o + .values() + .all(|value| Self::recursive_type_check(value, &map.types)), + (Value::Array(a), Schema::Array(array)) => a + .iter() + .all(|value| Self::recursive_type_check(value, &array.items)), + (_, Schema::Union(union)) => union + .variants() + .iter() + .any(|variant| Self::recursive_type_check(value, variant)), + _ => false, + } + } +} + +impl<'v, 's> Serialize for SchemaAwareRecordFieldDefault<'v, 's> { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + match (&self.value, self.schema) { + (Value::Null, Schema::Null) => serializer.serialize_unit(), + (Value::Bool(boolean), Schema::Boolean) => serializer.serialize_bool(*boolean), + (Value::Number(n), Schema::Int | Schema::Date | Schema::TimeMillis) if n.is_i64() => { + let long = n.as_i64().unwrap(); + let int = i32::try_from(long).map_err(|_| { + S::Error::custom(format!("Default {long} is too large for {:?}", self.schema)) + })?; + serializer.serialize_i32(int) + } + ( + Value::Number(n), + Schema::Long + | Schema::TimeMicros + | Schema::TimestampMillis + | Schema::TimestampMicros + | Schema::TimestampNanos + | Schema::LocalTimestampMillis + | Schema::LocalTimestampMicros + | Schema::LocalTimestampNanos, + ) if n.is_i64() => { + let long = n.as_i64().unwrap(); + serializer.serialize_i64(long) + } + (Value::Number(n), Schema::Float) if n.is_f64() => { + let float = n.as_f64().unwrap(); + serializer.serialize_f32(float as f32) + } + (Value::Number(n), Schema::Double) if n.is_f64() => { + let double = n.as_f64().unwrap(); + serializer.serialize_f64(double) + } + ( + Value::String(s), + Schema::Bytes + | Schema::Fixed(_) + | Schema::Uuid(UuidSchema::Bytes | UuidSchema::Fixed(_)) + | Schema::BigDecimal + | Schema::Decimal(_) + | Schema::Duration(_) + | Schema::Date, + ) => serializer.serialize_bytes(s.as_bytes()), + (Value::String(s), Schema::String | Schema::Uuid(UuidSchema::String)) => { + serializer.serialize_str(s) + } + (Value::String(s), Schema::Enum(enum_schema)) => { + let Some((variant_index, _)) = enum_schema + .symbols + .iter() + .enumerate() + .find(|(_i, symbol)| *symbol == s) + else { + return Err(S::Error::custom(format!( + "Could not find `{s}` in enum: {enum_schema:?}" + ))); + }; + + serializer.serialize_unit_variant( + SERIALIZING_SCHEMA_DEFAULT, + variant_index as u32, + SERIALIZING_SCHEMA_DEFAULT, + ) + } + // This abuses the support for flattened fields, which are also serialized as a map. + (Value::Object(o), Schema::Record(record)) => serializer.collect_map( + o.iter() + .enumerate() + .map(|(i, (k, v))| (k, Self::new(v, &record.fields[i].schema))), + ), Review Comment: ```suggestion (Value::Object(o), Schema::Record(record)) => serializer.collect_map( record.fields.iter().filter_map(|field| { o.get(&field.name) .map(|value| (&field.name, Self::new(value, &field.schema))) }), ), ``` to serialize the defaults in the schema order -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
