c-thiel commented on code in PR #491:
URL: https://github.com/apache/iceberg-rust/pull/491#discussion_r1714069854


##########
crates/iceberg/src/spec/partition.rs:
##########
@@ -136,6 +145,427 @@ impl UnboundPartitionSpec {
     }
 }
 
+impl From<PartitionField> for UnboundPartitionField {
+    fn from(field: PartitionField) -> Self {
+        UnboundPartitionField {
+            source_id: field.source_id,
+            partition_id: Some(field.field_id),
+            name: field.name,
+            transform: field.transform,
+        }
+    }
+}
+
+impl From<PartitionSpec> for UnboundPartitionSpec {
+    fn from(spec: PartitionSpec) -> Self {
+        UnboundPartitionSpec {
+            spec_id: Some(spec.spec_id),
+            fields: spec.fields.into_iter().map(Into::into).collect(),
+        }
+    }
+}
+
+/// Create a new UnboundPartitionSpec
+#[derive(Debug, Default)]
+pub struct UnboundPartitionSpecBuilder {
+    spec_id: Option<i32>,
+    fields: Vec<UnboundPartitionField>,
+}
+
+impl UnboundPartitionSpecBuilder {
+    /// Create a new partition spec builder with the given schema.
+    pub fn new() -> Self {
+        Self {
+            spec_id: None,
+            fields: vec![],
+        }
+    }
+
+    /// Set the spec id for the partition spec.
+    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
+        self.spec_id = Some(spec_id);
+        self
+    }
+
+    /// Add a new partition field to the partition spec from an unbound 
partition field.
+    pub fn add_partition_field(
+        self,
+        source_id: i32,
+        target_name: impl ToString,
+        transformation: Transform,
+    ) -> Result<Self> {
+        let field = UnboundPartitionField {
+            source_id,
+            partition_id: None,
+            name: target_name.to_string(),
+            transform: transformation,
+        };
+        self.add_partition_field_internal(field)
+    }
+
+    /// Add multiple partition fields to the partition spec.
+    pub fn add_partition_fields(
+        self,
+        fields: impl IntoIterator<Item = UnboundPartitionField>,
+    ) -> Result<Self> {
+        let mut builder = self;
+        for field in fields {
+            builder = builder.add_partition_field_internal(field)?;
+        }
+        Ok(builder)
+    }
+
+    fn add_partition_field_internal(mut self, field: UnboundPartitionField) -> 
Result<Self> {
+        self.check_name_set_and_unique(&field.name)?;
+        self.check_for_redundant_partitions(field.source_id, 
&field.transform)?;
+        if let Some(partition_id) = field.partition_id {
+            self.check_partition_id_unique(partition_id)?;
+        }
+        self.fields.push(field);
+        Ok(self)
+    }
+
+    /// Build the unbound partition spec.
+    pub fn build(self) -> UnboundPartitionSpec {
+        UnboundPartitionSpec {
+            spec_id: self.spec_id,
+            fields: self.fields,
+        }
+    }
+}
+
+/// Create valid partition specs for a given schema.
+#[derive(Debug)]
+pub struct PartitionSpecBuilder<'a> {
+    spec_id: Option<i32>,
+    last_assigned_field_id: i32,
+    fields: Vec<UnboundPartitionField>,
+    schema: &'a Schema,
+}
+
+impl<'a> PartitionSpecBuilder<'a> {
+    pub(crate) const UNPARTITIONED_LAST_ASSIGNED_ID: i32 = 999;
+    // Default partition spec id is only used for building `PartitionSpec`.
+    // When building unbound partition specs, the spec id is not set by 
default.
+    pub(crate) const DEFAULT_PARTITION_SPEC_ID: i32 = 0;
+
+    /// Create a new partition spec builder with the given schema.
+    pub fn new(schema: &'a Schema) -> Self {
+        Self {
+            spec_id: None,
+            fields: vec![],
+            last_assigned_field_id: Self::UNPARTITIONED_LAST_ASSIGNED_ID,
+            schema,
+        }
+    }
+
+    /// Create a new partition spec builder from an existing unbound partition 
spec.
+    pub fn new_from_unbound(unbound: UnboundPartitionSpec, schema: &'a Schema) 
-> Result<Self> {
+        let mut builder = Self::new(schema)
+            
.with_spec_id(unbound.spec_id.unwrap_or(Self::DEFAULT_PARTITION_SPEC_ID));
+
+        for field in unbound.fields {
+            builder = builder.add_unbound_field(field)?;
+        }
+        Ok(builder)
+    }
+
+    /// Set the last assigned field id for the partition spec.
+    ///
+    /// Set this field when a new partition spec is created for an existing 
TableMetaData.
+    /// As `field_id` must be unique in V2 metadata, this should be set to
+    /// the highest field id used previously.
+    pub fn with_last_assigned_field_id(mut self, last_assigned_field_id: i32) 
-> Self {
+        self.last_assigned_field_id = last_assigned_field_id;
+        self
+    }
+
+    /// Set the spec id for the partition spec.
+    pub fn with_spec_id(mut self, spec_id: i32) -> Self {
+        self.spec_id = Some(spec_id);
+        self
+    }
+
+    /// Add a new partition field to the partition spec.
+    pub fn add_partition_field(
+        self,
+        source_name: impl ToString,
+        target_name: impl ToString,
+        transform: Transform,
+    ) -> Result<Self> {
+        let source_id = self
+            .schema
+            .field_by_name(source_name.to_string().as_str())
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Cannot find source column with name: {} in schema",
+                        source_name.to_string()
+                    ),
+                )
+            })?
+            .id;
+        let field = UnboundPartitionField {
+            source_id,
+            partition_id: None,
+            name: target_name.to_string(),
+            transform,
+        };
+
+        self.add_unbound_field(field)
+    }
+
+    /// Add a new partition field to the partition spec.
+    ///
+    /// If `partition_id` is set, it is used as the field id.
+    /// Otherwise, a new `field_id` is assigned.
+    pub fn add_unbound_field(mut self, field: UnboundPartitionField) -> 
Result<Self> {
+        self.check_name_set_and_unique(&field.name)?;
+        self.check_for_redundant_partitions(field.source_id, 
&field.transform)?;
+        Self::check_name_does_not_collide_with_schema(&field, self.schema)?;
+        Self::check_transform_compatibility(&field, self.schema)?;

Review Comment:
   I prefer the ergonomy of my current approach, as it enables us to exactly 
pin down which field created the error. I like to fail fast if I can.
   Performance-wise I got rid of the HashSet but still need to loop over names.
   I don't think this has any measurable impact, especially considering that 
the amount of partition fields is typically going to be very small.
   
   If you still think performance could be an issue, we could increase the 
state with a `partition_names: HashSet<String>` field - which cost us one copy 
of each name though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to