c-thiel commented on code in PR #587:
URL: https://github.com/apache/iceberg-rust/pull/587#discussion_r1834831646


##########
crates/iceberg/src/spec/table_metadata_builder.rs:
##########
@@ -0,0 +1,2074 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::{HashMap, HashSet};
+use std::sync::Arc;
+
+use uuid::Uuid;
+
+use super::{
+    BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, 
Schema, SchemaRef,
+    Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, 
TableMetadata,
+    UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, 
MAIN_BRANCH, ONE_MINUTE_MS,
+    PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, 
PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT,
+    RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID,
+};
+use crate::error::{Error, ErrorKind, Result};
+use crate::{TableCreation, TableUpdate};
+
+const FIRST_FIELD_ID: u32 = 1;
+
+/// Manipulating table metadata.
+///
+/// For this builder the order of called functions matters. Functions are 
applied in-order.
+/// All operations applied to the `TableMetadata` are tracked in `changes` as  
a chronologically
+/// ordered vec of `TableUpdate`.
+/// If an operation does not lead to a change of the `TableMetadata`, the 
corresponding update
+/// is omitted from `changes`.
+///
+/// Unlike a typical builder pattern, the order of function calls matters.
+/// Some basic rules:
+/// - `add_schema` must be called before `set_current_schema`.
+/// - If a new partition spec and schema are added, the schema should be added 
first.
+#[derive(Debug, Clone)]
+pub struct TableMetadataBuilder {
+    metadata: TableMetadata,
+    changes: Vec<TableUpdate>,
+    last_added_schema_id: Option<i32>,
+    last_added_spec_id: Option<i32>,
+    last_added_order_id: Option<i64>,
+    // None if this is a new table (from_metadata) method not used
+    previous_history_entry: Option<MetadataLog>,
+}
+
+#[derive(Debug, Clone, PartialEq)]
+/// Result of modifying or creating a `TableMetadata`.
+pub struct TableMetadataBuildResult {
+    /// The new `TableMetadata`.
+    pub metadata: TableMetadata,
+    /// The changes that were applied to the metadata.
+    pub changes: Vec<TableUpdate>,
+    /// Expired metadata logs
+    pub expired_metadata_logs: Vec<MetadataLog>,
+}
+
+impl TableMetadataBuilder {
+    const LAST_ADDED: i32 = -1;
+
+    /// Create a `TableMetadata` object from scratch.
+    ///
+    /// This method re-assign ids of fields in the schema, schema.id, 
sort_order.id and
+    /// spec.id. It should only be used to create new table metadata from 
scratch.
+    pub fn new(
+        schema: Schema,
+        spec: impl Into<UnboundPartitionSpec>,
+        sort_order: SortOrder,
+        location: String,
+        format_version: FormatVersion,
+        properties: HashMap<String, String>,
+    ) -> Result<Self> {
+        // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new 
table.
+        let (fresh_schema, fresh_spec, fresh_sort_order) =
+            Self::reassign_ids(schema, spec.into(), sort_order)?;
+        let schema_id = fresh_schema.schema_id();
+
+        let builder = Self {
+            metadata: TableMetadata {
+                format_version,
+                table_uuid: Uuid::now_v7(),
+                location: "".to_string(), // Overwritten immediately by 
set_location
+                last_sequence_number: 0,
+                last_updated_ms: 0,    // Overwritten by build() if not set 
before
+                last_column_id: -1,    // Overwritten immediately by 
add_current_schema
+                current_schema_id: -1, // Overwritten immediately by 
add_current_schema
+                schemas: HashMap::new(),
+                partition_specs: HashMap::new(),
+                default_spec: Arc::new(
+                    
BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1),
+                ), // Overwritten immediately by add_default_partition_spec
+                last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID,
+                properties: HashMap::new(),
+                current_snapshot_id: None,
+                snapshots: HashMap::new(),
+                snapshot_log: vec![],
+                sort_orders: HashMap::new(),
+                metadata_log: vec![],
+                default_sort_order_id: -1, // Overwritten immediately by 
add_default_sort_order
+                refs: HashMap::default(),
+            },
+            changes: vec![],
+            last_added_schema_id: Some(schema_id),
+            last_added_spec_id: None,
+            last_added_order_id: None,
+            previous_history_entry: None,
+        };
+
+        builder
+            .set_location(location)
+            .add_current_schema(fresh_schema)?
+            .add_default_partition_spec(fresh_spec.into_unbound())?
+            .add_default_sort_order(fresh_sort_order)?
+            .set_properties(properties)
+    }
+
+    /// Creates a new table metadata builder from the given metadata to modify 
it.
+
+    /// `current_file_location` is the location where the current version
+    /// of the metadata file is stored. This is used to update the metadata 
log.
+    /// If `current_file_location` is `None`, the metadata log will not be 
updated.
+    /// This should only be used to stage-create tables.
+    #[must_use]
+    pub fn new_from_metadata(
+        previous: TableMetadata,
+        previous_file_location: Option<String>,
+    ) -> Self {
+        Self {
+            previous_history_entry: previous_file_location.map(|l| MetadataLog 
{
+                metadata_file: l,
+                timestamp_ms: previous.last_updated_ms,
+            }),
+            metadata: previous,
+            changes: Vec::default(),
+            last_added_schema_id: None,
+            last_added_spec_id: None,
+            last_added_order_id: None,
+        }
+    }
+
+    /// Creates a new table metadata builder from the given table creation.
+    pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> {
+        let TableCreation {
+            name: _,
+            location,
+            schema,
+            partition_spec,
+            sort_order,
+            properties,
+        } = table_creation;
+
+        let location = location.ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                "Can't create table without location",
+            )
+        })?;
+        let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec {
+            spec_id: None,
+            fields: vec![],
+        });
+
+        Self::new(
+            schema,
+            partition_spec,
+            sort_order.unwrap_or(SortOrder::unsorted_order()),
+            location,
+            FormatVersion::V1,
+            properties,
+        )
+    }
+
+    /// Get the current schema with all changes applied up to this point.
+    #[inline]
+    pub fn current_schema(&self) -> &SchemaRef {
+        self.metadata.current_schema()
+    }
+
+    /// Get the current last column id
+    #[inline]
+    pub fn last_column_id(&self) -> i32 {
+        self.metadata.last_column_id
+    }
+
+    /// Get the current last updated timestamp
+    #[inline]
+    pub fn last_updated_ms(&self) -> i64 {
+        self.metadata.last_updated_ms
+    }
+
+    /// Changes uuid of table metadata.
+    pub fn assign_uuid(mut self, uuid: Uuid) -> Self {
+        if self.metadata.table_uuid != uuid {
+            self.metadata.table_uuid = uuid;
+            self.changes.push(TableUpdate::AssignUuid { uuid });
+        }
+
+        self
+    }
+
+    /// Upgrade `FormatVersion`. Downgrades are not allowed.
+    ///
+    /// # Errors
+    /// - Cannot downgrade to older format versions.
+    pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> 
Result<Self> {
+        if format_version < self.metadata.format_version {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot downgrade FormatVersion from {} to {}",
+                    self.metadata.format_version, format_version
+                ),
+            ));
+        }
+
+        if format_version != self.metadata.format_version {
+            self.metadata.format_version = format_version;
+            self.changes
+                .push(TableUpdate::UpgradeFormatVersion { format_version });
+        }
+
+        Ok(self)
+    }
+
+    /// Set properties. If a property already exists, it will be overwritten.
+    ///
+    /// If a reserved property is set, the corresponding action is performed 
and the property is not persisted.
+    /// Currently the following reserved properties are supported:
+    /// * format-version: Set the format version of the table.
+    ///
+    /// # Errors
+    /// - If format-version property is set to a lower version than the 
current format version.
+    pub fn set_properties(mut self, properties: HashMap<String, String>) -> 
Result<Self> {
+        // List of specified properties that are RESERVED and should not be 
persisted.
+        let reserved_properties = properties
+            .keys()
+            .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str()))
+            .map(ToString::to_string)
+            .collect::<Vec<_>>();
+
+        if !reserved_properties.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Table properties should not contain reserved properties, 
but got: [{}]",
+                    reserved_properties.join(", ")
+                ),
+            ));
+        }
+
+        if properties.is_empty() {
+            return Ok(self);
+        }
+
+        self.metadata.properties.extend(properties.clone());
+        self.changes.push(TableUpdate::SetProperties {
+            updates: properties,
+        });
+
+        Ok(self)
+    }
+
+    /// Remove properties from the table metadata.
+    /// Does nothing if the key is not present.
+    pub fn remove_properties(mut self, properties: &[String]) -> Self {
+        for property in properties {
+            self.metadata.properties.remove(property);
+        }
+
+        if !properties.is_empty() {
+            self.changes.push(TableUpdate::RemoveProperties {
+                removals: properties.to_vec(),
+            });
+        }
+
+        self
+    }
+
+    /// Set the location of the table metadata, stripping any trailing slashes.
+    pub fn set_location(mut self, location: String) -> Self {
+        let location = location.trim_end_matches('/').to_string();
+        if self.metadata.location != location {
+            self.changes.push(TableUpdate::SetLocation {
+                location: location.clone(),
+            });
+            self.metadata.location = location;
+        }
+
+        self
+    }
+
+    /// Add a snapshot to the table metadata.
+    ///
+    /// # Errors
+    /// - No schema has been added to the table metadata.
+    /// - No partition spec has been added to the table metadata.
+    /// - No sort order has been added to the table metadata.
+    /// - Snapshot id already exists.
+    /// - For format version > 1: the sequence number of the snapshot is loser 
than the highest sequence number specified so far.
+    pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> {
+        if self.metadata.partition_specs.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Attempting to add a snapshot before a partition spec is 
added",
+            ));
+        }
+
+        if self.metadata.sort_orders.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                "Attempting to add a snapshot before a sort order is added",
+            ));
+        }
+
+        if self
+            .metadata
+            .snapshots
+            .contains_key(&snapshot.snapshot_id())
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!("Snapshot already exists for: '{}'", 
snapshot.snapshot_id()),
+            ));
+        }
+
+        if self.metadata.format_version != FormatVersion::V1
+            && snapshot.sequence_number() <= self.metadata.last_sequence_number
+            && snapshot.parent_snapshot_id().is_some()
+        {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add snapshot with sequence number {} older than 
last sequence number {}",
+                    snapshot.sequence_number(),
+                    self.metadata.last_sequence_number
+                )
+            ));
+        }
+
+        if let Some(last) = self.metadata.snapshot_log.last() {
+            // commits can happen concurrently from different machines.
+            // A tolerance helps us avoid failure for small clock skew
+            if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS {
+                return Err(Error::new(
+                    ErrorKind::DataInvalid,
+                    format!(
+                        "Invalid snapshot timestamp {}: before last snapshot 
timestamp {}",
+                        snapshot.timestamp_ms(),
+                        last.timestamp_ms
+                    ),
+                ));
+            }
+        }
+
+        if snapshot.timestamp_ms() - self.metadata.last_updated_ms < 
-ONE_MINUTE_MS {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Invalid snapshot timestamp {}: before last updated 
timestamp {}",
+                    snapshot.timestamp_ms(),
+                    self.metadata.last_updated_ms
+                ),
+            ));
+        }
+
+        // Mutation happens in next line - must be infallible from here
+        self.changes.push(TableUpdate::AddSnapshot {
+            snapshot: snapshot.clone(),
+        });
+
+        self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        self.metadata.last_sequence_number = snapshot.sequence_number();
+        self.metadata
+            .snapshots
+            .insert(snapshot.snapshot_id(), snapshot.into());
+
+        Ok(self)
+    }
+
+    /// Append a snapshot to the specified branch.
+    /// If branch is not specified, the snapshot is appended to the main 
branch.
+    /// The `ref` must already exist. Retention settings from the `ref` are 
re-used.
+    ///
+    /// # Errors
+    /// - The ref is unknown.
+    /// - Any of the preconditions of `self.add_snapshot` are not met.
+    pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) 
-> Result<Self> {
+        let ref_name = ref_name.unwrap_or(MAIN_BRANCH);
+        let mut reference = self
+            .metadata
+            .refs
+            .get(ref_name)
+            .ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Cannot append snapshot to unknown ref: '{}'", 
ref_name),
+                )
+            })?
+            .clone();
+
+        reference.snapshot_id = snapshot.snapshot_id();
+
+        self.add_snapshot(snapshot)?.set_ref(ref_name, reference)
+    }
+
+    /// Remove snapshots by its ids from the table metadata.
+    /// Does nothing if a snapshot id is not present.
+    /// Keeps as changes only the snapshots that were actually removed.
+    pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self {
+        let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len());
+
+        self.metadata.snapshots.retain(|k, _| {
+            if snapshot_ids.contains(k) {
+                removed_snapshots.push(*k);
+                false
+            } else {
+                true
+            }
+        });
+
+        if !removed_snapshots.is_empty() {
+            self.changes.push(TableUpdate::RemoveSnapshots {
+                snapshot_ids: removed_snapshots,
+            });
+        }
+
+        // Remove refs that are no longer valid
+        self.metadata
+            .refs
+            .retain(|_, v| 
self.metadata.snapshots.contains_key(&v.snapshot_id));
+
+        self
+    }
+
+    /// Set a reference to a snapshot.
+    ///
+    /// # Errors
+    /// - The snapshot id is unknown.
+    pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> 
Result<Self> {
+        if self
+            .metadata
+            .refs
+            .get(ref_name)
+            .is_some_and(|snap_ref| snap_ref.eq(&reference))
+        {
+            return Ok(self);
+        }
+
+        let Some(snapshot) = 
self.metadata.snapshots.get(&reference.snapshot_id) else {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot set '{ref_name}' to unknown snapshot: '{}'",
+                    reference.snapshot_id
+                ),
+            ));
+        };
+
+        // Update last_updated_ms to the exact timestamp of the snapshot if it 
was added in this commit
+        let is_added_snapshot = self.changes.iter().any(|update| {
+            matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if 
snap.snapshot_id() == snapshot.snapshot_id())
+        });
+        if is_added_snapshot {
+            self.metadata.last_updated_ms = snapshot.timestamp_ms();
+        }
+
+        // Current snapshot id is set only for the main branch
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = Some(snapshot.snapshot_id());
+            if self.metadata.last_updated_ms == i64::default() {
+                self.metadata.last_updated_ms = 
chrono::Utc::now().timestamp_millis();
+            };
+
+            self.metadata.snapshot_log.push(SnapshotLog {
+                snapshot_id: snapshot.snapshot_id(),
+                timestamp_ms: self.metadata.last_updated_ms,
+            });
+        }
+
+        self.changes.push(TableUpdate::SetSnapshotRef {
+            ref_name: ref_name.to_string(),
+            reference: reference.clone(),
+        });
+        self.metadata.refs.insert(ref_name.to_string(), reference);
+
+        Ok(self)
+    }
+
+    /// Remove a reference
+    ///
+    /// If `ref_name='main'` the current snapshot id is set to -1.
+    pub fn remove_ref(mut self, ref_name: &str) -> Self {
+        if ref_name == MAIN_BRANCH {
+            self.metadata.current_snapshot_id = 
Some(i64::from(Self::LAST_ADDED));
+            self.metadata.snapshot_log.clear();
+        }
+
+        if self.metadata.refs.remove(ref_name).is_some() || ref_name == 
MAIN_BRANCH {
+            self.changes.push(TableUpdate::RemoveSnapshotRef {
+                ref_name: ref_name.to_string(),
+            });
+        }
+
+        self
+    }
+
+    /// Add a schema to the table metadata.
+    ///
+    /// The provided `schema.schema_id` may not be used.
+
+    // ToDo Discuss: Should we add `new_last_column_id` argument?
+    // TLDR; I believe not as it acts as an assertion and its purpose (and 
source) is not clear. We shouldn't add it.
+    //
+    // Schemas can contain only old columns or a mix of old and new columns.
+    // In Java, if `new_last_column_id` set but too low, the function would 
fail, basically hinting at
+    // at the schema having been built for an older metadata version. 
`new_last_column_id` is typically obtained
+    // in the schema building process.
+    //
+    // This assertion is not required if the user controls the flow - he knows 
for which
+    // metadata he created a schema. If asserting the `new_last_column_id` was 
semantically important, it should be part of the schema and
+    // not be passed around alongside it.
+    //
+    // Specifying `new_last_column_id` in java also allows to set 
`metadata.last_column_id` to any arbitrary value
+    // even if its not present as a column. I believe this to be undesired 
behavior. This is not possible with the current Rust interface.
+    //
+    // If the schema is built out of sync with the TableMetadata, for example 
in a REST Catalog setting, the assertion of
+    // the provided `last_column_id` as part of the `TableUpdate::AddSchema` 
is still done in its `.apply` method.
+    pub fn add_schema(mut self, schema: Schema) -> Self {
+        // fn returns a result because I think we should check field-id <-> 
type compatibility if the field-id
+        // is still present in the metadata. This is not done in the Java code.
+        let new_schema_id = self.reuse_or_create_new_schema_id(&schema);
+        let schema_found = self.metadata.schemas.contains_key(&new_schema_id);
+
+        if schema_found {
+            // ToDo Discuss: The Java code is a bit convoluted and I think it 
might be wrong for an edge case.
+            // Why is it wrong: The baseline is, that if something changes the 
state of the builder, it has an effect on it and
+            // must be recorded in the changes.
+            // The Java code might or might not change `lastAddedSchemaId`, 
and does not record this change in `changes`.
+            // Thus, replaying the changes, would lead to a different result 
if a schema is added twice in unfavorable
+            // conditions.
+            // Here we do it differently, but a check from a Java maintainer 
would be nice.
+            if self.last_added_schema_id != Some(new_schema_id) {
+                self.changes.push(TableUpdate::AddSchema {
+                    last_column_id: Some(self.metadata.last_column_id),
+                    schema: schema.clone(),
+                });
+                self.last_added_schema_id = Some(new_schema_id);
+            }
+
+            return self;
+        }
+
+        // New schemas might contain only old columns. In this case 
last_column_id should not be
+        // reduced.
+        self.metadata.last_column_id =
+            std::cmp::max(self.metadata.last_column_id, 
schema.highest_field_id());
+
+        // Set schema-id
+        let schema = match new_schema_id == schema.schema_id() {
+            true => schema,
+            false => schema.with_schema_id(new_schema_id),
+        };
+
+        self.metadata
+            .schemas
+            .insert(new_schema_id, schema.clone().into());
+
+        self.changes.push(TableUpdate::AddSchema {
+            schema,
+            last_column_id: Some(self.metadata.last_column_id),
+        });
+
+        self.last_added_schema_id = Some(new_schema_id);
+
+        self
+    }
+
+    /// Set the current schema id.
+    ///
+    /// Errors:
+    /// - provided `schema_id` is -1 but no schema has been added via 
`add_schema`.
+    /// - No schema with the provided `schema_id` exists.
+    pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> {
+        if schema_id == Self::LAST_ADDED {
+            schema_id = self.last_added_schema_id.ok_or_else(|| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Cannot set current schema to last added schema: no schema 
has been added.",
+                )
+            })?;
+        };
+        let schema_id = schema_id; // Make immutable
+
+        if schema_id == self.metadata.current_schema_id {
+            return Ok(self);
+        }
+
+        let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| {
+            Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot set current schema to unknown schema with id: 
'{}'",
+                    schema_id
+                ),
+            )
+        })?;
+
+        // Old partition specs and sort-orders should be preserved even if 
they are not compatible with the new schema,
+        // so that older metadata can still be interpreted.

Review Comment:
   Oh, but then I think we should also re-check java. It is possible to have 
non-compatible changes due to:
   
https://github.com/apache/iceberg/blob/166edc7298825321f677e1e70cf88d7249e8035c/core/src/main/java/org/apache/iceberg/TableMetadata.java#L740
   Its using an unchecked build there - so even if it is not compatible, it can 
build.
   I copied the comment more or less from java. The java comment is:
   
     // build without validation because the schema may have changed in a way 
that makes this spec
     // invalid. the spec
     // should still be preserved so that older metadata can be interpreted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to