ZENOTME commented on code in PR #82: URL: https://github.com/apache/iceberg-rust/pull/82#discussion_r1367766620
########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,552 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!("raw literal ({}) fail convert to {:?}", v, ty), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Som`e. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { + key = Some(v); + } else if k == "value" { + value = Some(v); + } + }); + match (key, value) { + (Some(k), Some(v)) => { + map.insert( + k.try_into(key_ty)? + .ok_or_else(|| invalid_err("list"))?, + v.try_into(value_ty)?, + ); + } + _ => return Err(invalid_err("list")), + } + } else { + return Err(invalid_err("list")); + } + } + Ok(Some(Literal::Map(map))) + } + _ => Err(invalid_err("list")), + }, + RawLiteralEnum::Record(Record { + required, + optional: _, + }) => match ty { + Type::Struct(struct_ty) => { + let iters: Vec<(i32, Option<Literal>, String)> = required + .into_iter() + .map(|(field_name, value)| { + let field = struct_ty + .field_by_name(field_name.as_str()) + .ok_or_else(|| invalid_err("record"))?; + let value = value.try_into(&field.field_type)?; + Ok((field.id, value, field.name.clone())) + }) + .collect::<Result<_, Error>>()?; + Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) + } + Type::Map(map_ty) => { Review Comment: In avro, [`Value::Record` and `Value::Map` both will call `visitor::map` ](https://github.com/apache/avro/blob/369ae568f1017b54fc00bd2f49fd2ebe94fb1b2e/lang/rust/avro/src/de.rs#L268). So we convert them to `RawLiteral::Record` first and then according to the type info, we know convert it to `Literal::Record` or `Literal::Map`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org