liurenjie1024 commented on code in PR #491: URL: https://github.com/apache/iceberg-rust/pull/491#discussion_r1707133092
########## crates/iceberg/src/spec/partition.rs: ########## @@ -117,22 +144,312 @@ pub struct UnboundPartitionField { } /// Unbound partition spec can be built without a schema and later bound to a schema. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] #[serde(rename_all = "kebab-case")] -#[builder(setter(prefix = "with"))] pub struct UnboundPartitionSpec { /// Identifier for PartitionSpec - #[builder(default, setter(strip_option))] pub spec_id: Option<i32>, /// Details of the partition spec - #[builder(setter(each(name = "with_unbound_partition_field")))] pub fields: Vec<UnboundPartitionField>, } impl UnboundPartitionSpec { /// Create unbound partition spec builer - pub fn builder() -> UnboundPartitionSpecBuilder { - UnboundPartitionSpecBuilder::default() + pub fn builder() -> PartitionSpecBuilder { + PartitionSpecBuilder::default() + } +} + +/// Create valid partition specs for a given schema. +#[derive(Debug, Default)] +pub struct PartitionSpecBuilder { + spec_id: Option<i32>, + last_assigned_field_id: i32, + fields: Vec<UnboundPartitionField>, +} + +impl PartitionSpecBuilder { + pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; + // Default partition spec id is only used for building `PartitionSpec`. + // When building unbound partition specs, the spec id is not set by default. + pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; + + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: None, + fields: vec![], + last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + } + } + + /// Set the last assigned field id for the partition spec. + /// This is useful when re-binding partition specs. + pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { Review Comment: I took a look at java's code and didn't find any place it ensures the uniquess of `partition_id`, but I think you are right, we should follow spec rather than blindly following java implementation. ########## crates/iceberg/src/spec/partition.rs: ########## @@ -44,22 +44,27 @@ pub struct PartitionField { pub transform: Transform, } +impl PartitionField { + /// To unbound partition field + pub fn into_unbound(self) -> UnboundPartitionField { + self.into() + } +} + /// Partition spec that defines how to produce a tuple of partition values from a record. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] Review Comment: I'm thinking about removing the `pub` modifier for `spec_id` and `fields`, what do you think? I think it's better to only allow readonly access after built. It doesn't have to be done in this pr, just the builder discussion reminds me that it's dangerous to allow user to construct bound partition spec. ########## crates/iceberg/src/spec/partition.rs: ########## @@ -136,6 +145,427 @@ impl UnboundPartitionSpec { } } +impl From<PartitionField> for UnboundPartitionField { + fn from(field: PartitionField) -> Self { + UnboundPartitionField { + source_id: field.source_id, + partition_id: Some(field.field_id), + name: field.name, + transform: field.transform, + } + } +} + +impl From<PartitionSpec> for UnboundPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + UnboundPartitionSpec { + spec_id: Some(spec.spec_id), + fields: spec.fields.into_iter().map(Into::into).collect(), + } + } +} + +/// Create a new UnboundPartitionSpec +#[derive(Debug, Default)] +pub struct UnboundPartitionSpecBuilder { + spec_id: Option<i32>, + fields: Vec<UnboundPartitionField>, +} + +impl UnboundPartitionSpecBuilder { + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: None, + fields: vec![], + } + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec from an unbound partition field. + pub fn add_partition_field( + self, + source_id: i32, + target_name: impl ToString, + transformation: Transform, + ) -> Result<Self> { + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform: transformation, + }; + self.add_partition_field_internal(field) + } + + /// Add multiple partition fields to the partition spec. + pub fn add_partition_fields( + self, + fields: impl IntoIterator<Item = UnboundPartitionField>, + ) -> Result<Self> { + let mut builder = self; + for field in fields { + builder = builder.add_partition_field_internal(field)?; + } + Ok(builder) + } + + fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + self.fields.push(field); + Ok(self) + } + + /// Build the unbound partition spec. + pub fn build(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: self.spec_id, + fields: self.fields, + } + } +} + +/// Create valid partition specs for a given schema. +#[derive(Debug)] +pub struct PartitionSpecBuilder<'a> { + spec_id: Option<i32>, + last_assigned_field_id: i32, + fields: Vec<UnboundPartitionField>, + schema: &'a Schema, +} + +impl<'a> PartitionSpecBuilder<'a> { + pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; + // Default partition spec id is only used for building `PartitionSpec`. + // When building unbound partition specs, the spec id is not set by default. + pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; + + /// Create a new partition spec builder with the given schema. + pub fn new(schema: &'a Schema) -> Self { + Self { + spec_id: None, + fields: vec![], + last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + schema, + } + } + + /// Create a new partition spec builder from an existing unbound partition spec. + pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result<Self> { + let mut builder = Self::new(schema) + .with_spec_id(unbound.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID)); + + for field in unbound.fields { + builder = builder.add_unbound_field(field)?; + } + Ok(builder) + } + + /// Set the last assigned field id for the partition spec. + /// + /// Set this field when a new partition spec is created for an existing TableMetaData. + /// As `field_id` must be unique in V2 metadata, this should be set to + /// the highest field id used previously. + pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { + self.last_assigned_field_id = last_assigned_field_id; + self + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec. + pub fn add_partition_field( + self, + source_name: impl ToString, + target_name: impl ToString, + transform: Transform, + ) -> Result<Self> { + let source_id = self + .schema + .field_by_name(source_name.to_string().as_str()) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with name: {} in schema", + source_name.to_string() + ), + ) + })? + .id; + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform, + }; + + self.add_unbound_field(field) + } + + /// Add a new partition field to the partition spec. + /// + /// If `partition_id` is set, it is used as the field id. + /// Otherwise, a new `field_id` is assigned. + pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + Self::check_name_does_not_collide_with_schema(&field, self.schema)?; + Self::check_transform_compatibility(&field, self.schema)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + + self.fields.push(field); + Ok(self) + } + + /// Wrapper around `with_unbound_fields` to add multiple partition fields. + pub fn add_unbound_fields( + self, + fields: impl IntoIterator<Item = UnboundPartitionField>, + ) -> Result<Self> { + let mut builder = self; + for field in fields { + builder = builder.add_unbound_field(field)?; + } + Ok(builder) + } + + /// Build a bound partition spec with the given schema. + pub fn build(self) -> Result<PartitionSpec> { + let fields = Self::set_field_ids(self.fields, self.last_assigned_field_id)?; + Ok(PartitionSpec { + spec_id: self.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID), + fields, + }) + } + + fn set_field_ids( + fields: Vec<UnboundPartitionField>, + last_assigned_field_id: i32, + ) -> Result<Vec<PartitionField>> { + let mut last_assigned_field_id = last_assigned_field_id; + // Already assigned partition ids. If we see one of these during iteration, + // we skip it. + let assigned_ids = fields + .iter() + .filter_map(|f| f.partition_id) + .collect::<std::collections::HashSet<_>>(); + + fn _check_overflow(last_assigned_field_id: i32) -> Result<()> { Review Comment: We can use [`i32::checked_add`](https://doc.rust-lang.org/std/primitive.i32.html#method.checked_add) ########## crates/iceberg/src/spec/partition.rs: ########## @@ -136,6 +145,427 @@ impl UnboundPartitionSpec { } } +impl From<PartitionField> for UnboundPartitionField { + fn from(field: PartitionField) -> Self { + UnboundPartitionField { + source_id: field.source_id, + partition_id: Some(field.field_id), + name: field.name, + transform: field.transform, + } + } +} + +impl From<PartitionSpec> for UnboundPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + UnboundPartitionSpec { + spec_id: Some(spec.spec_id), + fields: spec.fields.into_iter().map(Into::into).collect(), + } + } +} + +/// Create a new UnboundPartitionSpec +#[derive(Debug, Default)] +pub struct UnboundPartitionSpecBuilder { + spec_id: Option<i32>, + fields: Vec<UnboundPartitionField>, +} + +impl UnboundPartitionSpecBuilder { + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: None, + fields: vec![], + } + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec from an unbound partition field. + pub fn add_partition_field( + self, + source_id: i32, + target_name: impl ToString, + transformation: Transform, + ) -> Result<Self> { + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform: transformation, + }; + self.add_partition_field_internal(field) + } + + /// Add multiple partition fields to the partition spec. + pub fn add_partition_fields( + self, + fields: impl IntoIterator<Item = UnboundPartitionField>, + ) -> Result<Self> { + let mut builder = self; + for field in fields { + builder = builder.add_partition_field_internal(field)?; + } + Ok(builder) + } + + fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + self.fields.push(field); + Ok(self) + } + + /// Build the unbound partition spec. + pub fn build(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: self.spec_id, + fields: self.fields, + } + } +} + +/// Create valid partition specs for a given schema. +#[derive(Debug)] +pub struct PartitionSpecBuilder<'a> { + spec_id: Option<i32>, + last_assigned_field_id: i32, + fields: Vec<UnboundPartitionField>, + schema: &'a Schema, +} + +impl<'a> PartitionSpecBuilder<'a> { + pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; + // Default partition spec id is only used for building `PartitionSpec`. + // When building unbound partition specs, the spec id is not set by default. + pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; + + /// Create a new partition spec builder with the given schema. + pub fn new(schema: &'a Schema) -> Self { + Self { + spec_id: None, + fields: vec![], + last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + schema, + } + } + + /// Create a new partition spec builder from an existing unbound partition spec. + pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result<Self> { Review Comment: I'm not insisting on this, but maybe add a `bind` method for `UnboundPartitionSpec` looks better? ########## crates/iceberg/src/spec/partition.rs: ########## @@ -44,22 +44,27 @@ pub struct PartitionField { pub transform: Transform, } +impl PartitionField { + /// To unbound partition field + pub fn into_unbound(self) -> UnboundPartitionField { + self.into() + } +} + /// Partition spec that defines how to produce a tuple of partition values from a record. -#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default, Builder)] +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone, Default)] Review Comment: Also we need to add a `schema_id` field? ########## crates/iceberg/src/spec/partition.rs: ########## @@ -136,6 +145,427 @@ impl UnboundPartitionSpec { } } +impl From<PartitionField> for UnboundPartitionField { + fn from(field: PartitionField) -> Self { + UnboundPartitionField { + source_id: field.source_id, + partition_id: Some(field.field_id), + name: field.name, + transform: field.transform, + } + } +} + +impl From<PartitionSpec> for UnboundPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + UnboundPartitionSpec { + spec_id: Some(spec.spec_id), + fields: spec.fields.into_iter().map(Into::into).collect(), + } + } +} + +/// Create a new UnboundPartitionSpec +#[derive(Debug, Default)] +pub struct UnboundPartitionSpecBuilder { + spec_id: Option<i32>, + fields: Vec<UnboundPartitionField>, +} + +impl UnboundPartitionSpecBuilder { + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: None, + fields: vec![], + } + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec from an unbound partition field. + pub fn add_partition_field( + self, + source_id: i32, + target_name: impl ToString, + transformation: Transform, + ) -> Result<Self> { + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform: transformation, + }; + self.add_partition_field_internal(field) + } + + /// Add multiple partition fields to the partition spec. + pub fn add_partition_fields( + self, + fields: impl IntoIterator<Item = UnboundPartitionField>, + ) -> Result<Self> { + let mut builder = self; + for field in fields { + builder = builder.add_partition_field_internal(field)?; + } + Ok(builder) + } + + fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + self.fields.push(field); + Ok(self) + } + + /// Build the unbound partition spec. + pub fn build(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: self.spec_id, + fields: self.fields, + } + } +} + +/// Create valid partition specs for a given schema. +#[derive(Debug)] +pub struct PartitionSpecBuilder<'a> { + spec_id: Option<i32>, + last_assigned_field_id: i32, + fields: Vec<UnboundPartitionField>, + schema: &'a Schema, +} + +impl<'a> PartitionSpecBuilder<'a> { + pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; + // Default partition spec id is only used for building `PartitionSpec`. + // When building unbound partition specs, the spec id is not set by default. + pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; + + /// Create a new partition spec builder with the given schema. + pub fn new(schema: &'a Schema) -> Self { + Self { + spec_id: None, + fields: vec![], + last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + schema, + } + } + + /// Create a new partition spec builder from an existing unbound partition spec. + pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result<Self> { + let mut builder = Self::new(schema) + .with_spec_id(unbound.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID)); + + for field in unbound.fields { + builder = builder.add_unbound_field(field)?; + } + Ok(builder) + } + + /// Set the last assigned field id for the partition spec. + /// + /// Set this field when a new partition spec is created for an existing TableMetaData. + /// As `field_id` must be unique in V2 metadata, this should be set to + /// the highest field id used previously. + pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { + self.last_assigned_field_id = last_assigned_field_id; + self + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec. + pub fn add_partition_field( + self, + source_name: impl ToString, + target_name: impl ToString, Review Comment: ```suggestion source_name: impl AsRef<str>, target_name: impl Into<String>, ``` `source_name` doesn't required allocation. `Into<String>` can avoid allocation when input is already `String`. ########## crates/iceberg/src/spec/partition.rs: ########## @@ -136,6 +145,427 @@ impl UnboundPartitionSpec { } } +impl From<PartitionField> for UnboundPartitionField { + fn from(field: PartitionField) -> Self { + UnboundPartitionField { + source_id: field.source_id, + partition_id: Some(field.field_id), + name: field.name, + transform: field.transform, + } + } +} + +impl From<PartitionSpec> for UnboundPartitionSpec { + fn from(spec: PartitionSpec) -> Self { + UnboundPartitionSpec { + spec_id: Some(spec.spec_id), + fields: spec.fields.into_iter().map(Into::into).collect(), + } + } +} + +/// Create a new UnboundPartitionSpec +#[derive(Debug, Default)] +pub struct UnboundPartitionSpecBuilder { + spec_id: Option<i32>, + fields: Vec<UnboundPartitionField>, +} + +impl UnboundPartitionSpecBuilder { + /// Create a new partition spec builder with the given schema. + pub fn new() -> Self { + Self { + spec_id: None, + fields: vec![], + } + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec from an unbound partition field. + pub fn add_partition_field( + self, + source_id: i32, + target_name: impl ToString, + transformation: Transform, + ) -> Result<Self> { + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform: transformation, + }; + self.add_partition_field_internal(field) + } + + /// Add multiple partition fields to the partition spec. + pub fn add_partition_fields( + self, + fields: impl IntoIterator<Item = UnboundPartitionField>, + ) -> Result<Self> { + let mut builder = self; + for field in fields { + builder = builder.add_partition_field_internal(field)?; + } + Ok(builder) + } + + fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> Result<Self> { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + if let Some(partition_id) = field.partition_id { + self.check_partition_id_unique(partition_id)?; + } + self.fields.push(field); + Ok(self) + } + + /// Build the unbound partition spec. + pub fn build(self) -> UnboundPartitionSpec { + UnboundPartitionSpec { + spec_id: self.spec_id, + fields: self.fields, + } + } +} + +/// Create valid partition specs for a given schema. +#[derive(Debug)] +pub struct PartitionSpecBuilder<'a> { + spec_id: Option<i32>, + last_assigned_field_id: i32, + fields: Vec<UnboundPartitionField>, + schema: &'a Schema, +} + +impl<'a> PartitionSpecBuilder<'a> { + pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999; + // Default partition spec id is only used for building `PartitionSpec`. + // When building unbound partition specs, the spec id is not set by default. + pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0; + + /// Create a new partition spec builder with the given schema. + pub fn new(schema: &'a Schema) -> Self { + Self { + spec_id: None, + fields: vec![], + last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID, + schema, + } + } + + /// Create a new partition spec builder from an existing unbound partition spec. + pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) -> Result<Self> { + let mut builder = Self::new(schema) + .with_spec_id(unbound.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID)); + + for field in unbound.fields { + builder = builder.add_unbound_field(field)?; + } + Ok(builder) + } + + /// Set the last assigned field id for the partition spec. + /// + /// Set this field when a new partition spec is created for an existing TableMetaData. + /// As `field_id` must be unique in V2 metadata, this should be set to + /// the highest field id used previously. + pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) -> Self { + self.last_assigned_field_id = last_assigned_field_id; + self + } + + /// Set the spec id for the partition spec. + pub fn with_spec_id(mut self, spec_id: i32) -> Self { + self.spec_id = Some(spec_id); + self + } + + /// Add a new partition field to the partition spec. + pub fn add_partition_field( + self, + source_name: impl ToString, + target_name: impl ToString, + transform: Transform, + ) -> Result<Self> { + let source_id = self + .schema + .field_by_name(source_name.to_string().as_str()) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot find source column with name: {} in schema", + source_name.to_string() + ), + ) + })? + .id; + let field = UnboundPartitionField { + source_id, + partition_id: None, + name: target_name.to_string(), + transform, + }; + + self.add_unbound_field(field) + } + + /// Add a new partition field to the partition spec. + /// + /// If `partition_id` is set, it is used as the field id. + /// Otherwise, a new `field_id` is assigned. + pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> Result<Self> { + self.check_name_set_and_unique(&field.name)?; + self.check_for_redundant_partitions(field.source_id, &field.transform)?; + Self::check_name_does_not_collide_with_schema(&field, self.schema)?; + Self::check_transform_compatibility(&field, self.schema)?; Review Comment: I would suggest to move this check to `build` method for following reasons: 1. Performance issue. I see that for every check we construct a `HashSet` for name, this maybe quite slow. Also we don't need to check this every method call? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org