liurenjie1024 commented on code in PR #587:
URL: https://github.com/apache/iceberg-rust/pull/587#discussion_r1843722934


##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2097 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    /// Proxy value for "last added" items
+    pub const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    // The spec id (-1) is just a proxy value and can be any 
negative number.
+                    // 0 would lead to wrong changes in the builder if the 
provided spec by the user is
+                    // also unpartitioned.
+                    // The `default_spec` value is always replaced at the end 
of this method by he `add_default_partition_spec`
+                    // method.
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }

Review Comment:
   I think we should make all these three methods as private.



##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2097 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    /// Proxy value for "last added" items

Review Comment:
   ```suggestion
       /// Proxy id for "last added" items, including schema, partition spec, 
sort order.
   ```



##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2074 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }
+
+    /// Changes uuid of table metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.table_uuid != uuid {
+            self.metadata.table_uuid = uuid;
+            self.changes.push(TableUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade FormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            self.metadata.format_version = format_version;
+            self.changes
+                .push(TableUpdate::UpgradeFormatVersion { format_version });
+        }
+
+        Ok(self)
+    }
+
+    /// Set properties. If a property already exists, it will be overwritten.
+    ///
+    /// If a reserved property is set, the corresponding action is performed 
and the property is not persisted.
+    /// Currently the following reserved properties are supported:
+    /// * format-version: Set the format version of the table.
+    ///
+    /// # Errors
+    /// - If format-version property is set to a lower version than the 
current format version.
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> 
Result<Self> {
+        // List of specified properties that are RESERVED and should not be 
persisted.
+        let reserved_properties = properties
+            .keys()
+            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .map(ToString::to_string)
+            .collect::<Vec<_>>();
+
+        if !reserved_properties.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Table properties should not contain reserved properties, 
but got: [{}]",
+                    reserved_properties.join(", ")
+                ),
+            ));
+        }
+
+        if properties.is_empty() {
+            return Ok(self);
+        }
+
+        self.metadata.properties.extend(properties.clone());
+        self.changes.push(TableUpdate::SetProperties {
+            updates: properties,
+        });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the table metadata.
+    /// Does nothing if the key is not present.
+    pub fn remove_properties(mut self, properties: &[String]) -> Self {
+        for property in properties {
+            self.metadata.properties.remove(property);
+        }
+
+        if !properties.is_empty() {
+            self.changes.push(TableUpdate::RemoveProperties {
+                removals: properties.to_vec(),
+            });
+        }
+
+        self
+    }
+
+    /// Set the location of the table metadata, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(TableUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Add a snapshot to the table metadata.
+    ///
+    /// # Errors
+    /// - No schema has been added to the table metadata.
+    /// - No partition spec has been added to the table metadata.
+    /// - No sort order has been added to the table metadata.
+    /// - Snapshot id already exists.
+    /// - For format version > 1: the sequence number of the snapshot is loser 
than the highest sequence number specified so far.
+    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+        if self.metadata.partition_specs.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Attempting to add a snapshot before a partition spec is 
added",
+            ));
+        }
+
+        if self.metadata.sort_orders.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Attempting to add a snapshot before a sort order is added",
+            ));
+        }
+
+        if self
+            .metadata
+            .snapshots
+            .contains_key(&snapshot.snapshot_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Snapshot already exists for: '{}'", 
snapshot.snapshot_id()),
+            ));
+        }
+
+        if self.metadata.format_version != FormatVersion::V1
+            && snapshot.sequence_number() <= self.metadata.last_sequence_number
+            && snapshot.parent_snapshot_id().is_some()
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add snapshot with sequence number {} older than 
last sequence number {}",
+                    snapshot.sequence_number(),
+                    self.metadata.last_sequence_number
+                )
+            ));
+        }
+
+        if let Some(last) = self.metadata.snapshot_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        snapshot.timestamp_ms(),
+                        last.timestamp_ms
+                    ),
+                ));
+            }
+        }
+
+        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Invalid snapshot timestamp {}: before last updated 
timestamp {}",
+                    snapshot.timestamp_ms(),
+                    self.metadata.last_updated_ms
+                ),
+            ));
+        }
+
+        // Mutation happens in next line - must be infallible from here
+        self.changes.push(TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        });
+
+        self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        self.metadata.last_sequence_number = snapshot.sequence_number();
+        self.metadata
+            .snapshots
+            .insert(snapshot.snapshot_id(), snapshot.into());
+
+        Ok(self)
+    }
+
+    /// Append a snapshot to the specified branch.
+    /// If branch is not specified, the snapshot is appended to the main 
branch.
+    /// The `ref` must already exist. Retention settings from the `ref` are 
re-used.
+    ///
+    /// # Errors
+    /// - The ref is unknown.
+    /// - Any of the preconditions of `self.add_snapshot` are not met.
+    pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) 
-> Result<Self> {
+        let ref_name = ref_name.unwrap_or(MAIN_BRANCH);
+        let mut reference = self
+            .metadata
+            .refs
+            .get(ref_name)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Cannot append snapshot to unknown ref: '{}'", 
ref_name),
+                )
+            })?
+            .clone();
+
+        reference.snapshot_id = snapshot.snapshot_id();
+
+        self.add_snapshot(snapshot)?.set_ref(ref_name, reference)
+    }
+
+    /// Remove snapshots by its ids from the table metadata.
+    /// Does nothing if a snapshot id is not present.
+    /// Keeps as changes only the snapshots that were actually removed.
+    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
+        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
+
+        self.metadata.snapshots.retain(|k, _| {
+            if snapshot_ids.contains(k) {
+                removed_snapshots.push(*k);
+                false
+            } else {
+                true
+            }
+        });
+
+        if !removed_snapshots.is_empty() {
+            self.changes.push(TableUpdate::RemoveSnapshots {
+                snapshot_ids: removed_snapshots,
+            });
+        }
+
+        // Remove refs that are no longer valid
+        self.metadata
+            .refs
+            .retain(|_, v| 
self.metadata.snapshots.contains_key(&v.snapshot_id));
+
+        self
+    }
+
+    /// Set a reference to a snapshot.
+    ///
+    /// # Errors
+    /// - The snapshot id is unknown.
+    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> 
Result<Self> {
+        if self
+            .metadata
+            .refs
+            .get(ref_name)
+            .is_some_and(|snap_ref| snap_ref.eq(&reference))
+        {
+            return Ok(self);
+        }
+
+        let Some(snapshot) = 
self.metadata.snapshots.get(&reference.snapshot_id) else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
+                    reference.snapshot_id
+                ),
+            ));
+        };
+
+        // Update last_updated_ms to the exact timestamp of the snapshot if it 
was added in this commit
+        let is_added_snapshot = self.changes.iter().any(|update| {
+            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if 
snap.snapshot_id() == snapshot.snapshot_id())
+        });
+        if is_added_snapshot {
+            self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        }
+
+        // Current snapshot id is set only for the main branch
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
+            if self.metadata.last_updated_ms == i64::default() {
+                self.metadata.last_updated_ms = 
chrono::Utc::now().timestamp_millis();
+            };
+
+            self.metadata.snapshot_log.push(SnapshotLog {
+                snapshot_id: snapshot.snapshot_id(),
+                timestamp_ms: self.metadata.last_updated_ms,
+            });
+        }
+
+        self.changes.push(TableUpdate::SetSnapshotRef {
+            ref_name: ref_name.to_string(),
+            reference: reference.clone(),
+        });
+        self.metadata.refs.insert(ref_name.to_string(), reference);
+
+        Ok(self)
+    }
+
+    /// Remove a reference
+    ///
+    /// If `ref_name='main'` the current snapshot id is set to -1.
+    pub fn remove_ref(mut self, ref_name: &str) -> Self {
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = 
Some(i64::from(Self::LAST_ADDED));
+            self.metadata.snapshot_log.clear();
+        }
+
+        if self.metadata.refs.remove(ref_name).is_some() || ref_name == 
MAIN_BRANCH {
+            self.changes.push(TableUpdate::RemoveSnapshotRef {
+                ref_name: ref_name.to_string(),
+            });
+        }
+
+        self
+    }
+
+    /// Add a schema to the table metadata.
+    ///
+    /// The provided `schema.schema_id` may not be used.
+
+    // ToDo Discuss: Should we add `new_last_column_id` argument?
+    // TLDR; I believe not as it acts as an assertion and its purpose (and 
source) is not clear. We shouldn't add it.
+    //
+    // Schemas can contain only old columns or a mix of old and new columns.
+    // In Java, if `new_last_column_id` set but too low, the function would 
fail, basically hinting at
+    // at the schema having been built for an older metadata version. 
`new_last_column_id` is typically obtained
+    // in the schema building process.
+    //
+    // This assertion is not required if the user controls the flow - he knows 
for which
+    // metadata he created a schema. If asserting the `new_last_column_id` was 
semantically important, it should be part of the schema and
+    // not be passed around alongside it.
+    //
+    // Specifying `new_last_column_id` in java also allows to set 
`metadata.last_column_id` to any arbitrary value
+    // even if its not present as a column. I believe this to be undesired 
behavior. This is not possible with the current Rust interface.
+    //
+    // If the schema is built out of sync with the TableMetadata, for example 
in a REST Catalog setting, the assertion of
+    // the provided `last_column_id` as part of the `TableUpdate::AddSchema` 
is still done in its `.apply` method.
+    pub fn add_schema(mut self, schema: Schema) -> Self {
+        // fn returns a result because I think we should check field-id <-> 
type compatibility if the field-id
+        // is still present in the metadata. This is not done in the Java code.
+        let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
+        let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
+
+        if schema_found {
+            // ToDo Discuss: The Java code is a bit convoluted and I think it 
might be wrong for an edge case.
+            // Why is it wrong: The baseline is, that if something changes the 
state of the builder, it has an effect on it and
+            // must be recorded in the changes.
+            // The Java code might or might not change `lastAddedSchemaId`, 
and does not record this change in `changes`.
+            // Thus, replaying the changes, would lead to a different result 
if a schema is added twice in unfavorable
+            // conditions.
+            // Here we do it differently, but a check from a Java maintainer 
would be nice.
+            if self.last_added_schema_id != Some(new_schema_id) {
+                self.changes.push(TableUpdate::AddSchema {
+                    last_column_id: Some(self.metadata.last_column_id),
+                    schema: schema.clone(),
+                });
+                self.last_added_schema_id = Some(new_schema_id);
+            }
+
+            return self;
+        }
+
+        // New schemas might contain only old columns. In this case 
last_column_id should not be
+        // reduced.
+        self.metadata.last_column_id =
+            std::cmp::max(self.metadata.last_column_id, 
schema.highest_field_id());
+
+        // Set schema-id
+        let schema = match new_schema_id == schema.schema_id() {
+            true => schema,
+            false => schema.with_schema_id(new_schema_id),
+        };
+
+        self.metadata
+            .schemas
+            .insert(new_schema_id, schema.clone().into());
+
+        self.changes.push(TableUpdate::AddSchema {
+            schema,
+            last_column_id: Some(self.metadata.last_column_id),
+        });
+
+        self.last_added_schema_id = Some(new_schema_id);
+
+        self
+    }
+
+    /// Set the current schema id.
+    ///
+    /// Errors:
+    /// - provided `schema_id` is -1 but no schema has been added via 
`add_schema`.
+    /// - No schema with the provided `schema_id` exists.
+    pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
+        if schema_id == Self::LAST_ADDED {
+            schema_id = self.last_added_schema_id.ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Cannot set current schema to last added schema: no schema 
has been added.",
+                )
+            })?;
+        };
+        let schema_id = schema_id; // Make immutable
+
+        if schema_id == self.metadata.current_schema_id {
+            return Ok(self);
+        }
+
+        let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot set current schema to unknown schema with id: 
'{}'",
+                    schema_id
+                ),
+            )
+        })?;
+
+        // Old partition specs and sort-orders should be preserved even if 
they are not compatible with the new schema,
+        // so that older metadata can still be interpreted.
+        // Default partition spec and sort order are checked in the build() 
method
+        // which allows other default partition specs and sort orders to be 
set before the build.
+
+        self.metadata.current_schema_id = schema_id;
+
+        if self.last_added_schema_id == Some(schema_id) {
+            self.changes.push(TableUpdate::SetCurrentSchema {
+                schema_id: Self::LAST_ADDED,
+            });
+        } else {
+            self.changes
+                .push(TableUpdate::SetCurrentSchema { schema_id });
+        }
+
+        Ok(self)
+    }
+
+    /// Add a schema and set it as the current schema.
+    pub fn add_current_schema(self, schema: Schema) -> Result<Self> {
+        self.add_schema(schema).set_current_schema(Self::LAST_ADDED)
+    }
+
+    /// Add a partition spec to the table metadata.
+    ///
+    /// The spec is bound eagerly to the current schema.
+    /// If a schema is added in the same set of changes, the schema should be 
added first.
+    ///
+    /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be 
used.
+    ///
+    /// # Errors
+    /// - The partition spec cannot be bound to the current schema.
+    /// - The partition spec has non-sequential field ids and the table format 
version is 1.
+    pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> 
Result<Self> {
+        let schema = self.get_current_schema()?.clone();
+        let spec = 
PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)?
+            .with_last_assigned_field_id(self.metadata.last_partition_id)
+            .build()?;
+
+        let new_spec_id = self.reuse_or_create_new_spec_id(&spec);
+        let spec_found = 
self.metadata.partition_specs.contains_key(&new_spec_id);
+        let spec = spec.with_spec_id(new_spec_id);
+        let unbound_spec = unbound_spec.with_spec_id(new_spec_id);
+
+        if spec_found {
+            if self.last_added_spec_id != Some(new_spec_id) {
+                self.changes
+                    .push(TableUpdate::AddSpec { spec: unbound_spec });
+                self.last_added_spec_id = Some(new_spec_id);
+            }
+
+            return Ok(self);
+        }
+
+        if self.metadata.format_version <= FormatVersion::V1 && 
!spec.has_sequential_ids() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Cannot add partition spec with non-sequential field ids to 
format version 1 table",
+            ));
+        }
+
+        let highest_field_id = spec
+            .highest_field_id()
+            .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID);
+        self.metadata
+            .partition_specs
+            .insert(new_spec_id, Arc::new(spec.into()));
+        self.changes
+            .push(TableUpdate::AddSpec { spec: unbound_spec });
+
+        self.last_added_spec_id = Some(new_spec_id);
+        self.metadata.last_partition_id =
+            std::cmp::max(self.metadata.last_partition_id, highest_field_id);
+
+        Ok(self)
+    }
+
+    /// Set the default partition spec.
+    ///
+    /// # Errors
+    /// - spec_id is -1 but no spec has been added via `add_partition_spec`.
+    /// - No partition spec with the provided `spec_id` exists.
+    pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> 
Result<Self> {
+        if spec_id == Self::LAST_ADDED {
+            spec_id = self.last_added_spec_id.ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Cannot set default partition spec to last added spec: no 
spec has been added.",
+                )
+            })?;
+        }
+
+        if self.metadata.default_spec.spec_id() == spec_id {
+            return Ok(self);
+        }
+
+        if !self.metadata.partition_specs.contains_key(&spec_id) {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Cannot set default partition spec to unknown spec 
with id: '{spec_id}'",),
+            ));
+        }
+
+        let schemaless_spec =
+            self.metadata
+                .partition_specs
+                .get(&spec_id)
+                .ok_or_else(|| {
+                    Error::new(
+                ErrorKind::DataInvalid,
+                format!("Cannot set default partition spec to unknown spec 
with id: '{spec_id}'",),
+            )
+                })?
+                .clone();
+        let spec =
+            
Arc::unwrap_or_clone(schemaless_spec).bind(self.get_current_schema()?.clone())?;
+        self.metadata.default_spec = Arc::new(spec);
+
+        if self.last_added_spec_id == Some(spec_id) {
+            self.changes.push(TableUpdate::SetDefaultSpec {
+                spec_id: Self::LAST_ADDED,
+            });
+        } else {
+            self.changes.push(TableUpdate::SetDefaultSpec { spec_id });
+        }
+
+        Ok(self)
+    }
+
+    /// Add a partition spec and set it as the default
+    pub fn add_default_partition_spec(self, unbound_spec: 
UnboundPartitionSpec) -> Result<Self> {
+        self.add_partition_spec(unbound_spec)?
+            .set_default_partition_spec(Self::LAST_ADDED)
+    }
+
+    /// Add a sort order to the table metadata.
+    ///
+    /// The spec is bound eagerly to the current schema and must be valid for 
it.
+    /// If a schema is added in the same set of changes, the schema should be 
added first.
+    ///
+    /// Even if `sort_order.order_id` is provided, it may not be used.
+    ///
+    /// # Errors
+    /// - Sort order id to add already exists.
+    /// - Sort order is incompatible with the current schema.
+    pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> {
+        let new_order_id = self.reuse_or_create_new_sort_id(&sort_order);
+        let sort_order_found = 
self.metadata.sort_orders.contains_key(&new_order_id);
+
+        if sort_order_found {
+            if self.last_added_order_id != Some(new_order_id) {
+                self.changes.push(TableUpdate::AddSortOrder {
+                    sort_order: sort_order.clone(),
+                });
+                self.last_added_order_id = Some(new_order_id);
+            }
+
+            return Ok(self);
+        }
+
+        // ToDo Discuss: Java builds a fresh spec here:

Review Comment:
   1. I agree that we should keep `add_schema` *for now*. 
   2. I agree that we should implement the issue proposed by @Fokko , which 
will be part of our trable transaction api.
   3. I don't think we should discourage the use of `add_schema` since they 
have different responsibilities. `TableMetadataBuilder`  do some necessary 
checks to avoid broke table metadata, but it doesn't guarantee that behavior. 
We should be careful with our transaction, which should guarantee to produce 
valid table metadata.
   
   In fact, I'm thinking about make all `TableMetadataBuilder` methods crate 
private. Java makes them public since java's access modifier is limited and 
doesn't support complex path visiblity like rust. But I think these methods 
should be called by transaction api or applying `TableUpdate`. What do you 
think?



##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2070 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    // The spec id (-1) is just a proxy value and can be any 
negative number.
+                    // 0 would lead to wrong changes in the builder if the 
provided spec by the user is
+                    // also unpartitioned.
+                    // The `default_spec` value is always replaced at the end 
of this method by he `add_default_partition_spec`
+                    // method.
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }
+
+    /// Changes uuid of table metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.table_uuid != uuid {
+            self.metadata.table_uuid = uuid;
+            self.changes.push(TableUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade FormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            match format_version {
+                FormatVersion::V1 => {
+                    // No changes needed for V1
+                }
+                FormatVersion::V2 => {
+                    self.metadata.format_version = format_version;
+                    self.changes
+                        .push(TableUpdate::UpgradeFormatVersion { 
format_version });
+                }
+            }
+        }
+
+        Ok(self)
+    }
+
+    /// Set properties. If a property already exists, it will be overwritten.
+    ///
+    /// If a reserved property is set, the corresponding action is performed 
and the property is not persisted.
+    /// Currently the following reserved properties are supported:
+    /// * format-version: Set the format version of the table.
+    ///
+    /// # Errors
+    /// - If properties contains a reserved property
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> 
Result<Self> {
+        // List of specified properties that are RESERVED and should not be 
persisted.
+        let reserved_properties = properties
+            .keys()
+            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .map(ToString::to_string)
+            .collect::<Vec<_>>();
+
+        if !reserved_properties.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Table properties should not contain reserved properties, 
but got: [{}]",
+                    reserved_properties.join(", ")
+                ),
+            ));
+        }
+
+        if properties.is_empty() {
+            return Ok(self);
+        }
+
+        self.metadata.properties.extend(properties.clone());
+        self.changes.push(TableUpdate::SetProperties {
+            updates: properties,
+        });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the table metadata.
+    /// Does nothing if the key is not present.
+    pub fn remove_properties(mut self, properties: &[String]) -> Self {
+        for property in properties {
+            self.metadata.properties.remove(property);
+        }
+
+        if !properties.is_empty() {
+            self.changes.push(TableUpdate::RemoveProperties {
+                removals: properties.to_vec(),
+            });
+        }
+
+        self
+    }
+
+    /// Set the location of the table metadata, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(TableUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Add a snapshot to the table metadata.
+    ///
+    /// # Errors
+    /// - No schema has been added to the table metadata.
+    /// - No partition spec has been added to the table metadata.
+    /// - No sort order has been added to the table metadata.
+    /// - Snapshot id already exists.
+    /// - For format version > 1: the sequence number of the snapshot is lower 
than the highest sequence number specified so far.
+    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+        if self
+            .metadata
+            .snapshots
+            .contains_key(&snapshot.snapshot_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Snapshot already exists for: '{}'", 
snapshot.snapshot_id()),
+            ));
+        }
+
+        if self.metadata.format_version != FormatVersion::V1
+            && snapshot.sequence_number() <= self.metadata.last_sequence_number
+            && snapshot.parent_snapshot_id().is_some()
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add snapshot with sequence number {} older than 
last sequence number {}",
+                    snapshot.sequence_number(),
+                    self.metadata.last_sequence_number
+                )
+            ));
+        }
+
+        if let Some(last) = self.metadata.snapshot_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        snapshot.timestamp_ms(),
+                        last.timestamp_ms
+                    ),
+                ));
+            }
+        }
+
+        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {

Review Comment:
   I agree that  it's not a strict technical requirment. But it seems not a big 
problem since java has one and there exists similar check.



##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2070 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    // The spec id (-1) is just a proxy value and can be any 
negative number.
+                    // 0 would lead to wrong changes in the builder if the 
provided spec by the user is
+                    // also unpartitioned.
+                    // The `default_spec` value is always replaced at the end 
of this method by he `add_default_partition_spec`
+                    // method.
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }
+
+    /// Changes uuid of table metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.table_uuid != uuid {
+            self.metadata.table_uuid = uuid;
+            self.changes.push(TableUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade FormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            match format_version {
+                FormatVersion::V1 => {
+                    // No changes needed for V1
+                }
+                FormatVersion::V2 => {
+                    self.metadata.format_version = format_version;
+                    self.changes
+                        .push(TableUpdate::UpgradeFormatVersion { 
format_version });
+                }
+            }
+        }
+
+        Ok(self)
+    }
+
+    /// Set properties. If a property already exists, it will be overwritten.
+    ///
+    /// If a reserved property is set, the corresponding action is performed 
and the property is not persisted.
+    /// Currently the following reserved properties are supported:
+    /// * format-version: Set the format version of the table.
+    ///
+    /// # Errors
+    /// - If properties contains a reserved property
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> 
Result<Self> {
+        // List of specified properties that are RESERVED and should not be 
persisted.
+        let reserved_properties = properties
+            .keys()
+            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .map(ToString::to_string)
+            .collect::<Vec<_>>();
+
+        if !reserved_properties.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Table properties should not contain reserved properties, 
but got: [{}]",
+                    reserved_properties.join(", ")
+                ),
+            ));
+        }
+
+        if properties.is_empty() {
+            return Ok(self);
+        }
+
+        self.metadata.properties.extend(properties.clone());
+        self.changes.push(TableUpdate::SetProperties {
+            updates: properties,
+        });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the table metadata.
+    /// Does nothing if the key is not present.
+    pub fn remove_properties(mut self, properties: &[String]) -> Self {
+        for property in properties {
+            self.metadata.properties.remove(property);
+        }
+
+        if !properties.is_empty() {
+            self.changes.push(TableUpdate::RemoveProperties {
+                removals: properties.to_vec(),
+            });
+        }
+
+        self
+    }
+
+    /// Set the location of the table metadata, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(TableUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Add a snapshot to the table metadata.
+    ///
+    /// # Errors
+    /// - No schema has been added to the table metadata.
+    /// - No partition spec has been added to the table metadata.
+    /// - No sort order has been added to the table metadata.
+    /// - Snapshot id already exists.
+    /// - For format version > 1: the sequence number of the snapshot is lower 
than the highest sequence number specified so far.
+    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+        if self
+            .metadata
+            .snapshots
+            .contains_key(&snapshot.snapshot_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Snapshot already exists for: '{}'", 
snapshot.snapshot_id()),
+            ));
+        }
+
+        if self.metadata.format_version != FormatVersion::V1
+            && snapshot.sequence_number() <= self.metadata.last_sequence_number
+            && snapshot.parent_snapshot_id().is_some()
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add snapshot with sequence number {} older than 
last sequence number {}",
+                    snapshot.sequence_number(),
+                    self.metadata.last_sequence_number
+                )
+            ));
+        }
+
+        if let Some(last) = self.metadata.snapshot_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        snapshot.timestamp_ms(),
+                        last.timestamp_ms
+                    ),
+                ));
+            }
+        }
+
+        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Invalid snapshot timestamp {}: before last updated 
timestamp {}",
+                    snapshot.timestamp_ms(),
+                    self.metadata.last_updated_ms
+                ),
+            ));
+        }
+
+        // Mutation happens in next line - must be infallible from here
+        self.changes.push(TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        });
+
+        self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        self.metadata.last_sequence_number = snapshot.sequence_number();
+        self.metadata
+            .snapshots
+            .insert(snapshot.snapshot_id(), snapshot.into());
+
+        Ok(self)
+    }
+
+    /// Append a snapshot to the specified branch.
+    /// If branch is not specified, the snapshot is appended to the main 
branch.
+    /// The `ref` must already exist. Retention settings from the `ref` are 
re-used.
+    ///
+    /// # Errors
+    /// - The ref is unknown.
+    /// - Any of the preconditions of `self.add_snapshot` are not met.
+    pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) 
-> Result<Self> {
+        let ref_name = ref_name.unwrap_or(MAIN_BRANCH);
+        let mut reference = self
+            .metadata
+            .refs
+            .get(ref_name)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Cannot append snapshot to unknown ref: '{}'", 
ref_name),
+                )
+            })?
+            .clone();
+
+        reference.snapshot_id = snapshot.snapshot_id();
+
+        self.add_snapshot(snapshot)?.set_ref(ref_name, reference)
+    }
+
+    /// Remove snapshots by its ids from the table metadata.
+    /// Does nothing if a snapshot id is not present.
+    /// Keeps as changes only the snapshots that were actually removed.
+    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
+        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
+
+        self.metadata.snapshots.retain(|k, _| {
+            if snapshot_ids.contains(k) {
+                removed_snapshots.push(*k);
+                false
+            } else {
+                true
+            }
+        });
+
+        if !removed_snapshots.is_empty() {
+            self.changes.push(TableUpdate::RemoveSnapshots {
+                snapshot_ids: removed_snapshots,
+            });
+        }
+
+        // Remove refs that are no longer valid
+        self.metadata
+            .refs
+            .retain(|_, v| 
self.metadata.snapshots.contains_key(&v.snapshot_id));
+
+        self
+    }
+
+    /// Set a reference to a snapshot.
+    ///
+    /// # Errors
+    /// - The snapshot id is unknown.
+    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> 
Result<Self> {
+        if self
+            .metadata
+            .refs
+            .get(ref_name)
+            .is_some_and(|snap_ref| snap_ref.eq(&reference))
+        {
+            return Ok(self);
+        }
+
+        let Some(snapshot) = 
self.metadata.snapshots.get(&reference.snapshot_id) else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
+                    reference.snapshot_id
+                ),
+            ));
+        };
+
+        // Update last_updated_ms to the exact timestamp of the snapshot if it 
was added in this commit
+        let is_added_snapshot = self.changes.iter().any(|update| {
+            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if 
snap.snapshot_id() == snapshot.snapshot_id())
+        });
+        if is_added_snapshot {
+            self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        }
+
+        // Current snapshot id is set only for the main branch
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
+            if self.metadata.last_updated_ms == i64::default() {
+                self.metadata.last_updated_ms = 
chrono::Utc::now().timestamp_millis();
+            };
+
+            self.metadata.snapshot_log.push(SnapshotLog {
+                snapshot_id: snapshot.snapshot_id(),
+                timestamp_ms: self.metadata.last_updated_ms,
+            });
+        }
+
+        self.changes.push(TableUpdate::SetSnapshotRef {
+            ref_name: ref_name.to_string(),
+            reference: reference.clone(),
+        });
+        self.metadata.refs.insert(ref_name.to_string(), reference);
+
+        Ok(self)
+    }
+
+    /// Remove a reference
+    ///
+    /// If `ref_name='main'` the current snapshot id is set to -1.
+    pub fn remove_ref(mut self, ref_name: &str) -> Self {
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = 
Some(i64::from(Self::LAST_ADDED));
+            self.metadata.snapshot_log.clear();
+        }
+
+        if self.metadata.refs.remove(ref_name).is_some() || ref_name == 
MAIN_BRANCH {

Review Comment:
   I agree that `if ref_name == MAIN_BRANCH`, we have changed it. My 
assumpation is that we always have `MAIN_BRANCH` since it's default branch? 
Please correct me if I'm wrong.



##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2070 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    // The spec id (-1) is just a proxy value and can be any 
negative number.
+                    // 0 would lead to wrong changes in the builder if the 
provided spec by the user is
+                    // also unpartitioned.
+                    // The `default_spec` value is always replaced at the end 
of this method by he `add_default_partition_spec`
+                    // method.
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }
+
+    /// Changes uuid of table metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.table_uuid != uuid {
+            self.metadata.table_uuid = uuid;
+            self.changes.push(TableUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade FormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            match format_version {
+                FormatVersion::V1 => {
+                    // No changes needed for V1
+                }
+                FormatVersion::V2 => {
+                    self.metadata.format_version = format_version;
+                    self.changes
+                        .push(TableUpdate::UpgradeFormatVersion { 
format_version });
+                }
+            }
+        }
+
+        Ok(self)
+    }
+
+    /// Set properties. If a property already exists, it will be overwritten.
+    ///
+    /// If a reserved property is set, the corresponding action is performed 
and the property is not persisted.
+    /// Currently the following reserved properties are supported:
+    /// * format-version: Set the format version of the table.
+    ///
+    /// # Errors
+    /// - If properties contains a reserved property
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> 
Result<Self> {
+        // List of specified properties that are RESERVED and should not be 
persisted.
+        let reserved_properties = properties
+            .keys()
+            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .map(ToString::to_string)
+            .collect::<Vec<_>>();
+
+        if !reserved_properties.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Table properties should not contain reserved properties, 
but got: [{}]",
+                    reserved_properties.join(", ")
+                ),
+            ));
+        }
+
+        if properties.is_empty() {
+            return Ok(self);
+        }
+
+        self.metadata.properties.extend(properties.clone());
+        self.changes.push(TableUpdate::SetProperties {
+            updates: properties,
+        });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the table metadata.
+    /// Does nothing if the key is not present.
+    pub fn remove_properties(mut self, properties: &[String]) -> Self {
+        for property in properties {
+            self.metadata.properties.remove(property);
+        }
+
+        if !properties.is_empty() {
+            self.changes.push(TableUpdate::RemoveProperties {
+                removals: properties.to_vec(),
+            });
+        }
+
+        self
+    }
+
+    /// Set the location of the table metadata, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(TableUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Add a snapshot to the table metadata.
+    ///
+    /// # Errors
+    /// - No schema has been added to the table metadata.
+    /// - No partition spec has been added to the table metadata.
+    /// - No sort order has been added to the table metadata.
+    /// - Snapshot id already exists.
+    /// - For format version > 1: the sequence number of the snapshot is lower 
than the highest sequence number specified so far.
+    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+        if self
+            .metadata
+            .snapshots
+            .contains_key(&snapshot.snapshot_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Snapshot already exists for: '{}'", 
snapshot.snapshot_id()),
+            ));
+        }
+
+        if self.metadata.format_version != FormatVersion::V1
+            && snapshot.sequence_number() <= self.metadata.last_sequence_number
+            && snapshot.parent_snapshot_id().is_some()
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add snapshot with sequence number {} older than 
last sequence number {}",
+                    snapshot.sequence_number(),
+                    self.metadata.last_sequence_number
+                )
+            ));
+        }
+
+        if let Some(last) = self.metadata.snapshot_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        snapshot.timestamp_ms(),
+                        last.timestamp_ms
+                    ),
+                ));
+            }
+        }
+
+        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Invalid snapshot timestamp {}: before last updated 
timestamp {}",
+                    snapshot.timestamp_ms(),
+                    self.metadata.last_updated_ms
+                ),
+            ));
+        }
+
+        // Mutation happens in next line - must be infallible from here
+        self.changes.push(TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        });
+
+        self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        self.metadata.last_sequence_number = snapshot.sequence_number();
+        self.metadata
+            .snapshots
+            .insert(snapshot.snapshot_id(), snapshot.into());
+
+        Ok(self)
+    }
+
+    /// Append a snapshot to the specified branch.
+    /// If branch is not specified, the snapshot is appended to the main 
branch.
+    /// The `ref` must already exist. Retention settings from the `ref` are 
re-used.

Review Comment:
   I think in iceberg's term tag is a special branch, while `ref` typicall 
means `SnapshotRef` which includes more things like retention time, etc. I've 
updated it to "`branch` or `tag`", what do you think?



##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2070 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    // The spec id (-1) is just a proxy value and can be any 
negative number.
+                    // 0 would lead to wrong changes in the builder if the 
provided spec by the user is
+                    // also unpartitioned.
+                    // The `default_spec` value is always replaced at the end 
of this method by he `add_default_partition_spec`
+                    // method.
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }
+
+    /// Changes uuid of table metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.table_uuid != uuid {
+            self.metadata.table_uuid = uuid;
+            self.changes.push(TableUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade FormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            match format_version {
+                FormatVersion::V1 => {
+                    // No changes needed for V1
+                }
+                FormatVersion::V2 => {
+                    self.metadata.format_version = format_version;
+                    self.changes
+                        .push(TableUpdate::UpgradeFormatVersion { 
format_version });
+                }
+            }
+        }
+
+        Ok(self)
+    }
+
+    /// Set properties. If a property already exists, it will be overwritten.
+    ///
+    /// If a reserved property is set, the corresponding action is performed 
and the property is not persisted.
+    /// Currently the following reserved properties are supported:
+    /// * format-version: Set the format version of the table.
+    ///
+    /// # Errors
+    /// - If properties contains a reserved property
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> 
Result<Self> {
+        // List of specified properties that are RESERVED and should not be 
persisted.
+        let reserved_properties = properties
+            .keys()
+            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .map(ToString::to_string)
+            .collect::<Vec<_>>();
+
+        if !reserved_properties.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Table properties should not contain reserved properties, 
but got: [{}]",
+                    reserved_properties.join(", ")
+                ),
+            ));
+        }
+
+        if properties.is_empty() {
+            return Ok(self);
+        }
+
+        self.metadata.properties.extend(properties.clone());
+        self.changes.push(TableUpdate::SetProperties {
+            updates: properties,
+        });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the table metadata.
+    /// Does nothing if the key is not present.
+    pub fn remove_properties(mut self, properties: &[String]) -> Self {
+        for property in properties {
+            self.metadata.properties.remove(property);
+        }
+
+        if !properties.is_empty() {
+            self.changes.push(TableUpdate::RemoveProperties {
+                removals: properties.to_vec(),
+            });
+        }
+
+        self
+    }
+
+    /// Set the location of the table metadata, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(TableUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Add a snapshot to the table metadata.
+    ///
+    /// # Errors
+    /// - No schema has been added to the table metadata.
+    /// - No partition spec has been added to the table metadata.
+    /// - No sort order has been added to the table metadata.
+    /// - Snapshot id already exists.
+    /// - For format version > 1: the sequence number of the snapshot is lower 
than the highest sequence number specified so far.
+    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+        if self
+            .metadata
+            .snapshots
+            .contains_key(&snapshot.snapshot_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Snapshot already exists for: '{}'", 
snapshot.snapshot_id()),
+            ));
+        }
+
+        if self.metadata.format_version != FormatVersion::V1
+            && snapshot.sequence_number() <= self.metadata.last_sequence_number
+            && snapshot.parent_snapshot_id().is_some()
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add snapshot with sequence number {} older than 
last sequence number {}",
+                    snapshot.sequence_number(),
+                    self.metadata.last_sequence_number
+                )
+            ));
+        }
+
+        if let Some(last) = self.metadata.snapshot_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        snapshot.timestamp_ms(),
+                        last.timestamp_ms
+                    ),
+                ));
+            }
+        }
+
+        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Invalid snapshot timestamp {}: before last updated 
timestamp {}",
+                    snapshot.timestamp_ms(),
+                    self.metadata.last_updated_ms
+                ),
+            ));
+        }
+
+        // Mutation happens in next line - must be infallible from here
+        self.changes.push(TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        });
+
+        self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        self.metadata.last_sequence_number = snapshot.sequence_number();
+        self.metadata
+            .snapshots
+            .insert(snapshot.snapshot_id(), snapshot.into());
+
+        Ok(self)
+    }
+
+    /// Append a snapshot to the specified branch.
+    /// If branch is not specified, the snapshot is appended to the main 
branch.
+    /// The `ref` must already exist. Retention settings from the `ref` are 
re-used.
+    ///
+    /// # Errors
+    /// - The ref is unknown.
+    /// - Any of the preconditions of `self.add_snapshot` are not met.
+    pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) 
-> Result<Self> {

Review Comment:
   1. In fact that methods is a wrong design for me. The implementation lacks 
some necessary check, and it's inconsistent with other parts, e.g. 
`TableMetadata` is designed to be immutable, and all modifications should go 
throught `TableMetadataBuilder`. I think we should make that api as deprecated, 
and remove it in future.
   2.  Yes, but it's weird when it has a `branch` argument. And the reason I 
suggest this is that I want to keep  api consistent with [java 
implementation](https://github.com/apache/iceberg/blob/fe23584fc3af9f0ea1371989030b0a99affb233f/core/src/main/java/org/apache/iceberg/TableMetadata.java#L1204).
   3. Similar to 2, I want to keep api name consistent to java. And you are 
right the name could be either branch or tag. I'm ok with argument name, but I 
prefer to have function name consistent with java api.



##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2070 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    // The spec id (-1) is just a proxy value and can be any 
negative number.
+                    // 0 would lead to wrong changes in the builder if the 
provided spec by the user is
+                    // also unpartitioned.
+                    // The `default_spec` value is always replaced at the end 
of this method by he `add_default_partition_spec`
+                    // method.
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }
+
+    /// Changes uuid of table metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.table_uuid != uuid {
+            self.metadata.table_uuid = uuid;
+            self.changes.push(TableUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade FormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            match format_version {
+                FormatVersion::V1 => {
+                    // No changes needed for V1
+                }
+                FormatVersion::V2 => {
+                    self.metadata.format_version = format_version;
+                    self.changes
+                        .push(TableUpdate::UpgradeFormatVersion { 
format_version });
+                }
+            }
+        }
+
+        Ok(self)
+    }
+
+    /// Set properties. If a property already exists, it will be overwritten.
+    ///
+    /// If a reserved property is set, the corresponding action is performed 
and the property is not persisted.
+    /// Currently the following reserved properties are supported:
+    /// * format-version: Set the format version of the table.
+    ///
+    /// # Errors
+    /// - If properties contains a reserved property
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> 
Result<Self> {
+        // List of specified properties that are RESERVED and should not be 
persisted.
+        let reserved_properties = properties
+            .keys()
+            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .map(ToString::to_string)
+            .collect::<Vec<_>>();
+
+        if !reserved_properties.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Table properties should not contain reserved properties, 
but got: [{}]",
+                    reserved_properties.join(", ")
+                ),
+            ));
+        }
+
+        if properties.is_empty() {
+            return Ok(self);
+        }
+
+        self.metadata.properties.extend(properties.clone());
+        self.changes.push(TableUpdate::SetProperties {
+            updates: properties,
+        });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the table metadata.
+    /// Does nothing if the key is not present.
+    pub fn remove_properties(mut self, properties: &[String]) -> Self {
+        for property in properties {
+            self.metadata.properties.remove(property);
+        }
+
+        if !properties.is_empty() {
+            self.changes.push(TableUpdate::RemoveProperties {
+                removals: properties.to_vec(),
+            });
+        }
+
+        self
+    }
+
+    /// Set the location of the table metadata, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(TableUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Add a snapshot to the table metadata.
+    ///
+    /// # Errors
+    /// - No schema has been added to the table metadata.
+    /// - No partition spec has been added to the table metadata.
+    /// - No sort order has been added to the table metadata.
+    /// - Snapshot id already exists.
+    /// - For format version > 1: the sequence number of the snapshot is lower 
than the highest sequence number specified so far.
+    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+        if self
+            .metadata
+            .snapshots
+            .contains_key(&snapshot.snapshot_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Snapshot already exists for: '{}'", 
snapshot.snapshot_id()),
+            ));
+        }
+
+        if self.metadata.format_version != FormatVersion::V1
+            && snapshot.sequence_number() <= self.metadata.last_sequence_number
+            && snapshot.parent_snapshot_id().is_some()
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add snapshot with sequence number {} older than 
last sequence number {}",
+                    snapshot.sequence_number(),
+                    self.metadata.last_sequence_number
+                )
+            ));
+        }
+
+        if let Some(last) = self.metadata.snapshot_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        snapshot.timestamp_ms(),
+                        last.timestamp_ms
+                    ),
+                ));
+            }
+        }
+
+        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Invalid snapshot timestamp {}: before last updated 
timestamp {}",
+                    snapshot.timestamp_ms(),
+                    self.metadata.last_updated_ms
+                ),
+            ));
+        }
+
+        // Mutation happens in next line - must be infallible from here
+        self.changes.push(TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        });
+
+        self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        self.metadata.last_sequence_number = snapshot.sequence_number();
+        self.metadata
+            .snapshots
+            .insert(snapshot.snapshot_id(), snapshot.into());
+
+        Ok(self)
+    }
+
+    /// Append a snapshot to the specified branch.
+    /// If branch is not specified, the snapshot is appended to the main 
branch.
+    /// The `ref` must already exist. Retention settings from the `ref` are 
re-used.
+    ///
+    /// # Errors
+    /// - The ref is unknown.
+    /// - Any of the preconditions of `self.add_snapshot` are not met.
+    pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) 
-> Result<Self> {
+        let ref_name = ref_name.unwrap_or(MAIN_BRANCH);
+        let mut reference = self
+            .metadata
+            .refs
+            .get(ref_name)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Cannot append snapshot to unknown ref: '{}'", 
ref_name),
+                )
+            })?
+            .clone();
+
+        reference.snapshot_id = snapshot.snapshot_id();
+
+        self.add_snapshot(snapshot)?.set_ref(ref_name, reference)
+    }
+
+    /// Remove snapshots by its ids from the table metadata.
+    /// Does nothing if a snapshot id is not present.
+    /// Keeps as changes only the snapshots that were actually removed.
+    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
+        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
+
+        self.metadata.snapshots.retain(|k, _| {
+            if snapshot_ids.contains(k) {
+                removed_snapshots.push(*k);
+                false
+            } else {
+                true
+            }
+        });
+
+        if !removed_snapshots.is_empty() {
+            self.changes.push(TableUpdate::RemoveSnapshots {
+                snapshot_ids: removed_snapshots,
+            });
+        }
+
+        // Remove refs that are no longer valid
+        self.metadata
+            .refs
+            .retain(|_, v| 
self.metadata.snapshots.contains_key(&v.snapshot_id));
+
+        self
+    }
+
+    /// Set a reference to a snapshot.
+    ///
+    /// # Errors
+    /// - The snapshot id is unknown.
+    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> 
Result<Self> {
+        if self
+            .metadata
+            .refs
+            .get(ref_name)
+            .is_some_and(|snap_ref| snap_ref.eq(&reference))
+        {
+            return Ok(self);
+        }
+
+        let Some(snapshot) = 
self.metadata.snapshots.get(&reference.snapshot_id) else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
+                    reference.snapshot_id
+                ),
+            ));
+        };
+
+        // Update last_updated_ms to the exact timestamp of the snapshot if it 
was added in this commit
+        let is_added_snapshot = self.changes.iter().any(|update| {
+            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if 
snap.snapshot_id() == snapshot.snapshot_id())
+        });
+        if is_added_snapshot {
+            self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        }
+
+        // Current snapshot id is set only for the main branch
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
+            if self.metadata.last_updated_ms == i64::default() {
+                self.metadata.last_updated_ms = 
chrono::Utc::now().timestamp_millis();
+            };
+
+            self.metadata.snapshot_log.push(SnapshotLog {
+                snapshot_id: snapshot.snapshot_id(),
+                timestamp_ms: self.metadata.last_updated_ms,
+            });
+        }
+
+        self.changes.push(TableUpdate::SetSnapshotRef {
+            ref_name: ref_name.to_string(),
+            reference: reference.clone(),
+        });
+        self.metadata.refs.insert(ref_name.to_string(), reference);
+
+        Ok(self)
+    }
+
+    /// Remove a reference
+    ///
+    /// If `ref_name='main'` the current snapshot id is set to -1.
+    pub fn remove_ref(mut self, ref_name: &str) -> Self {
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = 
Some(i64::from(Self::LAST_ADDED));
+            self.metadata.snapshot_log.clear();
+        }
+
+        if self.metadata.refs.remove(ref_name).is_some() || ref_name == 
MAIN_BRANCH {
+            self.changes.push(TableUpdate::RemoveSnapshotRef {
+                ref_name: ref_name.to_string(),
+            });
+        }
+
+        self
+    }
+
+    /// Add a schema to the table metadata.
+    ///
+    /// The provided `schema.schema_id` may not be used.
+
+    // ToDo Discuss: Should we add `new_last_column_id` argument?
+    // TLDR; I believe not as it acts as an assertion and its purpose (and 
source) is not clear. We shouldn't add it.
+    //
+    // Schemas can contain only old columns or a mix of old and new columns.
+    // In Java, if `new_last_column_id` set but too low, the function would 
fail, basically hinting at
+    // at the schema having been built for an older metadata version. 
`new_last_column_id` is typically obtained
+    // in the schema building process.
+    //
+    // This assertion is not required if the user controls the flow - he knows 
for which
+    // metadata he created a schema. If asserting the `new_last_column_id` was 
semantically important, it should be part of the schema and
+    // not be passed around alongside it.
+    //
+    // Specifying `new_last_column_id` in java also allows to set 
`metadata.last_column_id` to any arbitrary value
+    // even if its not present as a column. I believe this to be undesired 
behavior. This is not possible with the current Rust interface.
+    //
+    // If the schema is built out of sync with the TableMetadata, for example 
in a REST Catalog setting, the assertion of
+    // the provided `last_column_id` as part of the `TableUpdate::AddSchema` 
is still done in its `.apply` method.
+    pub fn add_schema(mut self, schema: Schema) -> Self {

Review Comment:
   Discussion continues here: 
https://github.com/apache/iceberg-rust/pull/587/files#r1834400220



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to