liurenjie1024 commented on code in PR #82: URL: https://github.com/apache/iceberg-rust/pull/82#discussion_r1365064528
########## crates/iceberg/src/avro/schema.rs: ########## @@ -203,8 +197,8 @@ impl SchemaVisitor for SchemaToAvroSchema { PrimitiveType::Timestamp => AvroSchema::TimestampMicros, PrimitiveType::Timestamptz => AvroSchema::TimestampMicros, PrimitiveType::String => AvroSchema::String, - PrimitiveType::Uuid => AvroSchema::Uuid, - PrimitiveType::Fixed(len) => avro_fixed_schema((*len) as usize)?, + PrimitiveType::Uuid => avro_fixed_schema(16, Some("uuid"))?, Review Comment: ```suggestion PrimitiveType::Uuid => avro_fixed_schema(UUID_BYTES, Some(UUID_LOGICAL_TYPE))?, ``` ########## crates/iceberg/src/spec/values.rs: ########## @@ -620,6 +622,47 @@ impl Struct { } } +/// An iterator that moves out of a struct. +pub struct StructIntoIter { Review Comment: ```suggestion pub struct StructValueIntoIter { ``` ########## crates/iceberg/src/avro/schema.rs: ########## @@ -223,13 +217,38 @@ pub(crate) fn schema_to_avro_schema(name: impl ToString, schema: &Schema) -> Res visit_schema(schema, &mut converter).map(Either::unwrap_left) } -pub(crate) fn avro_fixed_schema(len: usize) -> Result<AvroSchema> { +fn avro_record_schema(name: &str, fields: Vec<AvroRecordField>) -> Result<AvroSchema> { + let lookup = fields + .iter() + .enumerate() + .map(|f| (f.1.name.clone(), f.0)) + .collect(); + + Ok(AvroSchema::Record(RecordSchema { + name: Name::new(name)?, + aliases: None, + doc: None, + fields, + lookup, + attributes: Default::default(), + })) +} + +pub(crate) fn avro_fixed_schema(len: usize, logical_type: Option<&str>) -> Result<AvroSchema> { + let attributes = if let Some(logical_type) = logical_type { + BTreeMap::from([( + "logicalType".to_string(), Review Comment: Make this a const? ########## crates/iceberg/src/spec/values.rs: ########## @@ -620,6 +622,47 @@ impl Struct { } } +/// An iterator that moves out of a struct. +pub struct StructIntoIter { + null_bitmap: bitvec::boxed::IntoIter, + fields: std::vec::IntoIter<Literal>, + field_ids: std::vec::IntoIter<i32>, + field_names: std::vec::IntoIter<String>, +} + +impl Iterator for StructIntoIter { + type Item = (i32, Option<Literal>, String); Review Comment: I think the item should be `Option<Literal>`. According to [this discussion](https://github.com/apache/iceberg-rust/issues/77), we will remove struct types in `Struct`, and we can't return field name then. ########## crates/iceberg/src/avro/schema.rs: ########## @@ -431,14 +451,35 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { AvroSchema::Date => Type::Primitive(PrimitiveType::Date), AvroSchema::TimeMicros => Type::Primitive(PrimitiveType::Time), AvroSchema::TimestampMicros => Type::Primitive(PrimitiveType::Timestamp), - AvroSchema::Uuid => Type::Primitive(PrimitiveType::Uuid), AvroSchema::Boolean => Type::Primitive(PrimitiveType::Boolean), AvroSchema::Int => Type::Primitive(PrimitiveType::Int), AvroSchema::Long => Type::Primitive(PrimitiveType::Long), AvroSchema::Float => Type::Primitive(PrimitiveType::Float), AvroSchema::Double => Type::Primitive(PrimitiveType::Double), AvroSchema::String | AvroSchema::Enum(_) => Type::Primitive(PrimitiveType::String), - AvroSchema::Fixed(fixed) => Type::Primitive(PrimitiveType::Fixed(fixed.size as u64)), + AvroSchema::Fixed(fixed) => { + if let Some(logical_type) = fixed.attributes.get("logicalType") { Review Comment: As above, please don't use magic string but constants. ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,552 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!("raw literal ({}) fail convert to {:?}", v, ty), Review Comment: ```suggestion format!("Unable to convert raw literal ({}) fail convert to type {}", v, ty), ``` ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,552 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!("raw literal ({}) fail convert to {:?}", v, ty), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Som`e. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { + key = Some(v); + } else if k == "value" { + value = Some(v); + } + }); + match (key, value) { + (Some(k), Some(v)) => { + map.insert( + k.try_into(key_ty)? + .ok_or_else(|| invalid_err("list"))?, + v.try_into(value_ty)?, + ); + } + _ => return Err(invalid_err("list")), + } + } else { + return Err(invalid_err("list")); + } + } + Ok(Some(Literal::Map(map))) + } + _ => Err(invalid_err("list")), + }, + RawLiteralEnum::Record(Record { + required, + optional: _, + }) => match ty { + Type::Struct(struct_ty) => { + let iters: Vec<(i32, Option<Literal>, String)> = required + .into_iter() + .map(|(field_name, value)| { + let field = struct_ty + .field_by_name(field_name.as_str()) + .ok_or_else(|| invalid_err("record"))?; + let value = value.try_into(&field.field_type)?; + Ok((field.id, value, field.name.clone())) + }) + .collect::<Result<_, Error>>()?; + Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) + } + Type::Map(map_ty) => { Review Comment: Is this possible? When will this happen? ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,552 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!("raw literal ({}) fail convert to {:?}", v, ty), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Som`e. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { + key = Some(v); + } else if k == "value" { + value = Some(v); + } + }); + match (key, value) { + (Some(k), Some(v)) => { Review Comment: I think whether value could be `None` depends on `map_value_field.required`? ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,552 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!("raw literal ({}) fail convert to {:?}", v, ty), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Som`e. `None` will be represented Review Comment: ```suggestion // In deserialize, the element always be `Some`. `None` will be represented ``` ########## crates/iceberg/src/spec/values.rs: ########## @@ -996,10 +1039,552 @@ mod timestamptz { } } +mod _serde { + use std::collections::BTreeMap; + + use crate::{ + spec::{PrimitiveType, Type}, + Error, ErrorKind, + }; + + use super::{Literal, PrimitiveLiteral}; + use serde::{ + de::Visitor, + ser::{SerializeMap, SerializeSeq, SerializeStruct}, + Deserialize, Serialize, + }; + use serde_bytes::ByteBuf; + use serde_derive::Deserialize as DeserializeDerive; + use serde_derive::Serialize as SerializeDerive; + + #[derive(SerializeDerive, DeserializeDerive)] + #[serde(transparent)] + /// Raw literal representation used for serde. The serialize way is used for Avro serializer. + pub struct RawLiteral(RawLiteralEnum); + + impl RawLiteral { + /// Covert literal to raw literal. + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + Ok(Self(RawLiteralEnum::try_from(literal, ty)?)) + } + + /// Convert raw literal to literal. + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + self.0.try_into(ty) + } + } + + #[derive(SerializeDerive, Clone)] + #[serde(untagged)] + enum RawLiteralEnum { + Null, + Boolean(bool), + Int(i32), + Long(i64), + Float(f32), + Double(f64), + String(String), + Bytes(ByteBuf), + List(List), + StringMap(StringMap), + Record(Record), + } + + #[derive(Clone)] + struct Record { + required: Vec<(String, RawLiteralEnum)>, + optional: Vec<(String, Option<RawLiteralEnum>)>, + } + + impl Serialize for Record { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let len = self.required.len() + self.optional.len(); + let mut record = serializer.serialize_struct("", len)?; + for (k, v) in &self.required { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + for (k, v) in &self.optional { + record.serialize_field(Box::leak(k.clone().into_boxed_str()), &v)?; + } + record.end() + } + } + + #[derive(Clone)] + struct List { + list: Vec<Option<RawLiteralEnum>>, + required: bool, + } + + impl Serialize for List { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.list.len()))?; + for value in &self.list { + if self.required { + seq.serialize_element(value.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "List element is required, element cannot be null", + ) + })?)?; + } else { + seq.serialize_element(&value)?; + } + } + seq.end() + } + } + + #[derive(Clone)] + struct StringMap { + raw: Vec<(String, Option<RawLiteralEnum>)>, + required: bool, + } + + impl Serialize for StringMap { + fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> + where + S: serde::Serializer, + { + let mut map = serializer.serialize_map(Some(self.raw.len()))?; + for (k, v) in &self.raw { + if self.required { + map.serialize_entry( + k, + v.as_ref().ok_or_else(|| { + serde::ser::Error::custom( + "Map element is required, element cannot be null", + ) + })?, + )?; + } else { + map.serialize_entry(k, v)?; + } + } + map.end() + } + } + + impl<'de> Deserialize<'de> for RawLiteralEnum { + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> + where + D: serde::Deserializer<'de>, + { + struct RawLiteralVisitor; + impl<'de> Visitor<'de> for RawLiteralVisitor { + type Value = RawLiteralEnum; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("expect") + } + + fn visit_bool<E>(self, v: bool) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Boolean(v)) + } + + fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Int(v)) + } + + fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v)) + } + + /// Used in json + fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Long(v as i64)) + } + + fn visit_f32<E>(self, v: f32) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Float(v)) + } + + fn visit_f64<E>(self, v: f64) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Double(v)) + } + + fn visit_str<E>(self, v: &str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_bytes<E>(self, v: &[u8]) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Bytes(ByteBuf::from(v))) + } + + fn visit_borrowed_str<E>(self, v: &'de str) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::String(v.to_string())) + } + + fn visit_unit<E>(self) -> Result<Self::Value, E> + where + E: serde::de::Error, + { + Ok(RawLiteralEnum::Null) + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut required = Vec::new(); + while let Some(key) = map.next_key::<String>()? { + let value = map.next_value::<RawLiteralEnum>()?; + required.push((key, value)); + } + Ok(RawLiteralEnum::Record(Record { + required, + optional: Vec::new(), + })) + } + + fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error> + where + A: serde::de::SeqAccess<'de>, + { + let mut list = Vec::new(); + while let Some(value) = seq.next_element::<RawLiteralEnum>()? { + list.push(Some(value)); + } + Ok(RawLiteralEnum::List(List { + list, + // `required` only used in serialize, just set default in deserialize. + required: false, + })) + } + } + deserializer.deserialize_any(RawLiteralVisitor) + } + } + + impl RawLiteralEnum { + pub fn try_from(literal: Literal, ty: &Type) -> Result<Self, Error> { + let raw = match literal { + Literal::Primitive(prim) => match prim { + super::PrimitiveLiteral::Boolean(v) => RawLiteralEnum::Boolean(v), + super::PrimitiveLiteral::Int(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Long(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Float(v) => RawLiteralEnum::Float(v.0), + super::PrimitiveLiteral::Double(v) => RawLiteralEnum::Double(v.0), + super::PrimitiveLiteral::Date(v) => RawLiteralEnum::Int(v), + super::PrimitiveLiteral::Time(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::Timestamp(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::TimestampTZ(v) => RawLiteralEnum::Long(v), + super::PrimitiveLiteral::String(v) => RawLiteralEnum::String(v), + super::PrimitiveLiteral::UUID(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.as_u128().to_be_bytes())) + } + super::PrimitiveLiteral::Fixed(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Binary(v) => RawLiteralEnum::Bytes(ByteBuf::from(v)), + super::PrimitiveLiteral::Decimal(v) => { + RawLiteralEnum::Bytes(ByteBuf::from(v.to_le_bytes())) + } + }, + Literal::Struct(r#struct) => { + let mut required = Vec::new(); + let mut optional = Vec::new(); + if let Type::Struct(sturct_ty) = ty { + for (id, value, field_name) in r#struct.into_iter() { + let field = sturct_ty.field_by_id(id).ok_or_else(|| { + Error::new(ErrorKind::DataInvalid, "field not found") + })?; + if field.required { + if let Some(value) = value { + required.push(( + field_name, + RawLiteralEnum::try_from(value, &field.field_type)?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "Can't convert null to required field", + )); + } + } else if let Some(value) = value { + optional.push(( + field_name, + Some(RawLiteralEnum::try_from(value, &field.field_type)?), + )); + } else { + optional.push((field_name, None)); + } + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a struct", ty), + )); + } + RawLiteralEnum::Record(Record { required, optional }) + } + Literal::List(list) => { + if let Type::List(list_ty) = ty { + let list = list + .into_iter() + .map(|v| { + v.map(|v| { + RawLiteralEnum::try_from(v, &list_ty.element_field.field_type) + }) + .transpose() + }) + .collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: list_ty.element_field.required, + }) + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a list", ty), + )); + } + } + Literal::Map(map) => { + if let Type::Map(map_ty) = ty { + if let Type::Primitive(PrimitiveType::String) = *map_ty.key_field.field_type + { + let mut raw = Vec::with_capacity(map.len()); + for (k, v) in map { + if let Literal::Primitive(PrimitiveLiteral::String(k)) = k { + raw.push(( + k, + v.map(|v| { + RawLiteralEnum::try_from( + v, + &map_ty.value_field.field_type, + ) + }) + .transpose()?, + )); + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + "literal type is inconsistent with type", + )); + } + } + RawLiteralEnum::StringMap(StringMap { + raw, + required: map_ty.value_field.required, + }) + } else { + let list = map.into_iter().map(|(k,v)| { + let raw_k = + RawLiteralEnum::try_from(k, &map_ty.key_field.field_type)?; + let raw_v = v + .map(|v| { + RawLiteralEnum::try_from(v, &map_ty.value_field.field_type) + }) + .transpose()?; + if map_ty.value_field.required { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ("value".to_string(), raw_v.ok_or_else(||Error::new(ErrorKind::DataInvalid, "Map value is required, value cannot be null"))?), + ], + optional: vec![], + }))) + } else { + Ok(Some(RawLiteralEnum::Record(Record { + required: vec![ + ("key".to_string(), raw_k), + ], + optional: vec![ + ("value".to_string(), raw_v) + ], + }))) + } + }).collect::<Result<_, Error>>()?; + RawLiteralEnum::List(List { + list, + required: true, + }) + } + } else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Type {} should be a map", ty), + )); + } + } + }; + Ok(raw) + } + + pub fn try_into(self, ty: &Type) -> Result<Option<Literal>, Error> { + let invalid_err = |v: &str| { + Error::new( + ErrorKind::DataInvalid, + format!("raw literal ({}) fail convert to {:?}", v, ty), + ) + }; + match self { + RawLiteralEnum::Null => Ok(None), + RawLiteralEnum::Boolean(v) => Ok(Some(Literal::bool(v))), + RawLiteralEnum::Int(v) => match ty { + Type::Primitive(PrimitiveType::Int) => Ok(Some(Literal::int(v))), + Type::Primitive(PrimitiveType::Date) => Ok(Some(Literal::date(v))), + _ => Err(invalid_err("int")), + }, + RawLiteralEnum::Long(v) => match ty { + Type::Primitive(PrimitiveType::Long) => Ok(Some(Literal::long(v))), + Type::Primitive(PrimitiveType::Time) => Ok(Some(Literal::time(v))), + Type::Primitive(PrimitiveType::Timestamp) => Ok(Some(Literal::timestamp(v))), + Type::Primitive(PrimitiveType::Timestamptz) => { + Ok(Some(Literal::timestamptz(v))) + } + _ => Err(invalid_err("long")), + }, + RawLiteralEnum::Float(v) => match ty { + Type::Primitive(PrimitiveType::Float) => Ok(Some(Literal::float(v))), + _ => Err(invalid_err("float")), + }, + RawLiteralEnum::Double(v) => match ty { + Type::Primitive(PrimitiveType::Double) => Ok(Some(Literal::double(v))), + _ => Err(invalid_err("double")), + }, + RawLiteralEnum::String(v) => match ty { + Type::Primitive(PrimitiveType::String) => Ok(Some(Literal::string(v))), + _ => Err(invalid_err("string")), + }, + // # TODO + // rust avro don't support deserialize any bytes representation now. + RawLiteralEnum::Bytes(_) => Err(invalid_err("bytes")), + RawLiteralEnum::List(v) => match ty { + Type::List(ty) => Ok(Some(Literal::List( + v.list + .into_iter() + .map(|v| { + if let Some(v) = v { + v.try_into(&ty.element_field.field_type) + } else { + Ok(None) + } + }) + .collect::<Result<_, Error>>()?, + ))), + Type::Map(map_ty) => { + let key_ty = map_ty.key_field.field_type.as_ref(); + let value_ty = map_ty.value_field.field_type.as_ref(); + let mut map = BTreeMap::new(); + for k_v in v.list { + // In deserialize, the element always be `Som`e. `None` will be represented + // as `Some(RawLiteral::Null)` + let k_v = k_v.ok_or_else(|| invalid_err("list"))?; + if let RawLiteralEnum::Record(Record { + required, + optional: _, + }) = k_v + { + if required.len() != 2 { + return Err(invalid_err("list")); + } + let mut key = None; + let mut value = None; + required.into_iter().for_each(|(k, v)| { + if k == "key" { + key = Some(v); + } else if k == "value" { + value = Some(v); + } + }); + match (key, value) { + (Some(k), Some(v)) => { + map.insert( + k.try_into(key_ty)? + .ok_or_else(|| invalid_err("list"))?, + v.try_into(value_ty)?, + ); + } + _ => return Err(invalid_err("list")), + } + } else { + return Err(invalid_err("list")); + } + } + Ok(Some(Literal::Map(map))) + } + _ => Err(invalid_err("list")), + }, + RawLiteralEnum::Record(Record { + required, + optional: _, + }) => match ty { + Type::Struct(struct_ty) => { + let iters: Vec<(i32, Option<Literal>, String)> = required + .into_iter() + .map(|(field_name, value)| { + let field = struct_ty + .field_by_name(field_name.as_str()) + .ok_or_else(|| invalid_err("record"))?; + let value = value.try_into(&field.field_type)?; + Ok((field.id, value, field.name.clone())) + }) + .collect::<Result<_, Error>>()?; + Ok(Some(Literal::Struct(super::Struct::from_iter(iters)))) + } + Type::Map(map_ty) => { + if *map_ty.key_field.field_type != Type::Primitive(PrimitiveType::String) { + return Err(invalid_err("record")); + } + let mut map = BTreeMap::new(); + for (k, v) in required { + map.insert( + Literal::string(k), + v.try_into(&map_ty.value_field.field_type)?, + ); + } + Ok(Some(Literal::Map(map))) + } + _ => Err(invalid_err("record")), + }, + RawLiteralEnum::StringMap(_) => Err(invalid_err("string map")), Review Comment: Why this is invalid? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org