liurenjie1024 commented on code in PR #82: URL: https://github.com/apache/iceberg-rust/pull/82#discussion_r1368190040
########## crates/iceberg/src/avro/schema.rs: ########## @@ -431,14 +456,35 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { AvroSchema::Date => Type::Primitive(PrimitiveType::Date), AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time), AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp), - AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid), AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean), AvroSchema::Int => Type::Primitive(PrimitiveType::Int), AvroSchema::Long => Type::Primitive(PrimitiveType::Long), AvroSchema::Float => Type::Primitive(PrimitiveType::Float), AvroSchema::Double => Type::Primitive(PrimitiveType::Double), AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String), - AvroSchema::Fixed(fixed) => Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)), + AvroSchema::Fixed(fixed) => { + if let Some(logical_type) = fixed.attributes.get(LOGICAL_TYPE) { + let logical_type = logical_type.as_str().ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "logicalType in attributes of avro schema is not a string type", + ) + })?; + match logical_type { + "uuid" => Type::Primitive(PrimitiveType::Uuid), Review Comment: ```suggestion UUID_LOGICAL_TYPE => Type::Primitive(PrimitiveType::Uuid), ``` ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,559 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { Review Comment: I think `Record` should contains a `StructType`: ``` struct Record<'a> { r#type: &'a StructType, values: Vec<Option<RawLiteralEnum>> } ``` This way we can avoid copying field names every time. But we can leave it as an optimization. ########## crates/iceberg/src/spec/values.rs: ########## @@ -620,6 +622,47 @@ impl Struct { } } +/// An iterator that moves out of a struct. +pub struct StructIntoIter { + null_bitmap: bitvec::boxed::IntoIter, + fields: std::vec::IntoIter<Literal>, + field_ids: std::vec::IntoIter<i32>, + field_names: std::vec::IntoIter<String>, +} + +impl Iterator for StructIntoIter { + type Item = (i32, Option<Literal>, String); Review Comment: If we remove struct types in `Struct`, we also have no `filed_id`. ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,559 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Unable to convert raw literal ({}) fail convert to type {}", + v, ty + ), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Some`. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { + key = Some(v); + } else if k == "value" { + value = Some(v); + } + }); + match (key, value) { + (Some(k), Some(v)) => { + let key = k + .try_into(key_ty)? + .ok_or_else(|| invalid_err("list"))?; + let value = v.try_into(value_ty)?; + if map_ty.value_field.required && value.is_none() { + return Err(invalid_err("list")); Review Comment: I think we should make the error here more clear. ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,559 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Unable to convert raw literal ({}) fail convert to type {}", + v, ty + ), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Some`. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { + key = Some(v); + } else if k == "value" { Review Comment: ```suggestion } else if k == MAP_VALUE_FIELD_NAME { ``` ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,552 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!("raw literal ({}) fail convert to {:?}", v, ty), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Som`e. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { + key = Some(v); + } else if k == "value" { + value = Some(v); + } + }); + match (key, value) { + (Some(k), Some(v)) => { + map.insert( + k.try_into(key_ty)? + .ok_or_else(|| invalid_err("list"))?, + v.try_into(value_ty)?, + ); + } + _ => return Err(invalid_err("list")), + } + } else { + return Err(invalid_err("list")); + } + } + Ok(Some(Literal::Map(map))) + } + _ => Err(invalid_err("list")), + }, + RawLiteralEnum::Record(Record { + required, + optional: _, + }) => match ty { + Type::Struct(struct_ty) => { + let iters: Vec<(i32, Option<Literal>, String)> = required + .into_iter() + .map(|(field_name, value)| { + let field = struct_ty + .field_by_name(field_name.as_str()) + .ok_or_else(|| invalid_err("record"))?; + let value = value.try_into(&field.field_type)?; + Ok((field.id, value, field.name.clone())) + }) + .collect::<Result<_, Error>>()?; + Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) + } + Type::Map(map_ty) => { Review Comment: I see, thanks! ########## crates/iceberg/src/spec/values.rs: ########## @@ -1309,4 +1928,256 @@ mod tests { &Type::Primitive(PrimitiveType::String), ); } + + #[test] + fn avro_convert_test_int() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Int(32)), + &Type::Primitive(PrimitiveType::Int), + ); + } + + #[test] + fn avro_convert_test_long() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Long(32)), + &Type::Primitive(PrimitiveType::Long), + ); + } + + #[test] + fn avro_convert_test_float() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Float(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Float), + ); + } + + #[test] + fn avro_convert_test_double() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Double(OrderedFloat(1.0))), + &Type::Primitive(PrimitiveType::Double), + ); + } + + #[test] + fn avro_convert_test_string() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::String("iceberg".to_string())), + &Type::Primitive(PrimitiveType::String), + ); + } + + #[test] + fn avro_convert_test_date() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Date(17486)), + &Type::Primitive(PrimitiveType::Date), + ); + } + + #[test] + fn avro_convert_test_time() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Time(81068123456)), + &Type::Primitive(PrimitiveType::Time), + ); + } + + #[test] + fn avro_convert_test_timestamp() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::Timestamp(1510871468123456)), + &Type::Primitive(PrimitiveType::Timestamp), + ); + } + + #[test] + fn avro_convert_test_timestamptz() { + check_convert_with_avro( + Literal::Primitive(PrimitiveLiteral::TimestampTZ(1510871468123456)), + &Type::Primitive(PrimitiveType::Timestamptz), + ); + } + + #[test] + fn avro_convert_test_list() { + check_convert_with_avro( + Literal::List(vec![ + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + Some(Literal::Primitive(PrimitiveLiteral::Int(3))), + None, + ]), + &Type::List(ListType { + element_field: NestedField::list_element( + 0, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ); + + check_convert_with_avro( + Literal::List(vec![ + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + Some(Literal::Primitive(PrimitiveLiteral::Int(3))), + ]), + &Type::List(ListType { + element_field: NestedField::list_element( + 0, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + ); + } + + #[test] + fn avro_convert_test_map() { + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::Int(1)), + Some(Literal::Primitive(PrimitiveLiteral::Long(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::Int(2)), + Some(Literal::Primitive(PrimitiveLiteral::Long(2))), + ), + (Literal::Primitive(PrimitiveLiteral::Int(3)), None), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::Int)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Long), + false, + ) + .into(), + }), + ); + + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::Int(1)), + Some(Literal::Primitive(PrimitiveLiteral::Long(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::Int(2)), + Some(Literal::Primitive(PrimitiveLiteral::Long(2))), + ), + ( + Literal::Primitive(PrimitiveLiteral::Int(3)), + Some(Literal::Primitive(PrimitiveLiteral::Long(3))), + ), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::Int)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Long), + true, + ) + .into(), + }), + ); + } + + #[test] + fn avro_convert_test_string_map() { + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::String("a".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("b".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("c".to_string())), + None, + ), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Int), + false, + ) + .into(), + }), + ); + + check_convert_with_avro( + Literal::Map(BTreeMap::from([ + ( + Literal::Primitive(PrimitiveLiteral::String("a".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("b".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(2))), + ), + ( + Literal::Primitive(PrimitiveLiteral::String("c".to_string())), + Some(Literal::Primitive(PrimitiveLiteral::Int(3))), + ), + ])), + &Type::Map(MapType { + key_field: NestedField::map_key_element(0, Type::Primitive(PrimitiveType::String)) + .into(), + value_field: NestedField::map_value_element( + 1, + Type::Primitive(PrimitiveType::Int), + true, + ) + .into(), + }), + ); + } + + #[test] + fn avro_convert_test_record() { + check_convert_with_avro( + Literal::Struct(Struct::from_iter(vec![ + ( + 1, + Some(Literal::Primitive(PrimitiveLiteral::Int(1))), + "id".to_string(), + ), + ( + 2, + Some(Literal::Primitive(PrimitiveLiteral::String( + "bar".to_string(), + ))), + "name".to_string(), + ), + (3, None, "address".to_string()), + ])), + &Type::Struct(StructType::new(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::optional(3, "address", Type::Primitive(PrimitiveType::String)).into(), + ])), + ); + } + + // # TODO + // rust avro don't support deserialize any bytes representation now: + // - binary + // - fixed + // - decimal + // - uuid Review Comment: I think we can still test them? They are encoded in bytes when converting from literal to raw literal? ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,559 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Unable to convert raw literal ({}) fail convert to type {}", + v, ty + ), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Some`. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { Review Comment: ```suggestion if k == MAP_KEY_FIELD_NAME { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org