c-thiel commented on code in PR #587: URL: https://github.com/apache/iceberg-rust/pull/587#discussion_r1834836340
########## crates/iceberg/src/spec/table_metadata_builder.rs: ########## @@ -0,0 +1,2074 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use uuid::Uuid; + +use super::{ + BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, Schema, SchemaRef, + Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, TableMetadata, + UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, ONE_MINUTE_MS, + PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, + RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID, +}; +use crate::error::{Error, ErrorKind, Result}; +use crate::{TableCreation, TableUpdate}; + +const FIRST_FIELD_ID: u32 = 1; + +/// Manipulating table metadata. +/// +/// For this builder the order of called functions matters. Functions are applied in-order. +/// All operations applied to the `TableMetadata` are tracked in `changes` as a chronologically +/// ordered vec of `TableUpdate`. +/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update +/// is omitted from `changes`. +/// +/// Unlike a typical builder pattern, the order of function calls matters. +/// Some basic rules: +/// - `add_schema` must be called before `set_current_schema`. +/// - If a new partition spec and schema are added, the schema should be added first. +#[derive(Debug, Clone)] +pub struct TableMetadataBuilder { + metadata: TableMetadata, + changes: Vec<TableUpdate>, + last_added_schema_id: Option<i32>, + last_added_spec_id: Option<i32>, + last_added_order_id: Option<i64>, + // None if this is a new table (from_metadata) method not used + previous_history_entry: Option<MetadataLog>, +} + +#[derive(Debug, Clone, PartialEq)] +/// Result of modifying or creating a `TableMetadata`. +pub struct TableMetadataBuildResult { + /// The new `TableMetadata`. + pub metadata: TableMetadata, + /// The changes that were applied to the metadata. + pub changes: Vec<TableUpdate>, + /// Expired metadata logs + pub expired_metadata_logs: Vec<MetadataLog>, +} + +impl TableMetadataBuilder { + const LAST_ADDED: i32 = -1; + + /// Create a `TableMetadata` object from scratch. + /// + /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and + /// spec.id. It should only be used to create new table metadata from scratch. + pub fn new( + schema: Schema, + spec: impl Into<UnboundPartitionSpec>, + sort_order: SortOrder, + location: String, + format_version: FormatVersion, + properties: HashMap<String, String>, + ) -> Result<Self> { + // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table. + let (fresh_schema, fresh_spec, fresh_sort_order) = + Self::reassign_ids(schema, spec.into(), sort_order)?; + let schema_id = fresh_schema.schema_id(); + + let builder = Self { + metadata: TableMetadata { + format_version, + table_uuid: Uuid::now_v7(), + location: "".to_string(), // Overwritten immediately by set_location + last_sequence_number: 0, + last_updated_ms: 0, // Overwritten by build() if not set before + last_column_id: -1, // Overwritten immediately by add_current_schema + current_schema_id: -1, // Overwritten immediately by add_current_schema + schemas: HashMap::new(), + partition_specs: HashMap::new(), + default_spec: Arc::new( + BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1), + ), // Overwritten immediately by add_default_partition_spec + last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID, + properties: HashMap::new(), + current_snapshot_id: None, + snapshots: HashMap::new(), + snapshot_log: vec![], + sort_orders: HashMap::new(), + metadata_log: vec![], + default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order + refs: HashMap::default(), + }, + changes: vec![], + last_added_schema_id: Some(schema_id), + last_added_spec_id: None, + last_added_order_id: None, + previous_history_entry: None, + }; + + builder + .set_location(location) + .add_current_schema(fresh_schema)? + .add_default_partition_spec(fresh_spec.into_unbound())? + .add_default_sort_order(fresh_sort_order)? + .set_properties(properties) + } + + /// Creates a new table metadata builder from the given metadata to modify it. + + /// `current_file_location` is the location where the current version + /// of the metadata file is stored. This is used to update the metadata log. + /// If `current_file_location` is `None`, the metadata log will not be updated. + /// This should only be used to stage-create tables. + #[must_use] + pub fn new_from_metadata( + previous: TableMetadata, + previous_file_location: Option<String>, + ) -> Self { + Self { + previous_history_entry: previous_file_location.map(|l| MetadataLog { + metadata_file: l, + timestamp_ms: previous.last_updated_ms, + }), + metadata: previous, + changes: Vec::default(), + last_added_schema_id: None, + last_added_spec_id: None, + last_added_order_id: None, + } + } + + /// Creates a new table metadata builder from the given table creation. + pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> { + let TableCreation { + name: _, + location, + schema, + partition_spec, + sort_order, + properties, + } = table_creation; + + let location = location.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't create table without location", + ) + })?; + let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec { + spec_id: None, + fields: vec![], + }); + + Self::new( + schema, + partition_spec, + sort_order.unwrap_or(SortOrder::unsorted_order()), + location, + FormatVersion::V1, + properties, + ) + } + + /// Get the current schema with all changes applied up to this point. + #[inline] + pub fn current_schema(&self) -> &SchemaRef { + self.metadata.current_schema() + } + + /// Get the current last column id + #[inline] + pub fn last_column_id(&self) -> i32 { + self.metadata.last_column_id + } + + /// Get the current last updated timestamp + #[inline] + pub fn last_updated_ms(&self) -> i64 { + self.metadata.last_updated_ms + } + + /// Changes uuid of table metadata. + pub fn assign_uuid(mut self, uuid: Uuid) -> Self { + if self.metadata.table_uuid != uuid { + self.metadata.table_uuid = uuid; + self.changes.push(TableUpdate::AssignUuid { uuid }); + } + + self + } + + /// Upgrade `FormatVersion`. Downgrades are not allowed. + /// + /// # Errors + /// - Cannot downgrade to older format versions. + pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> { + if format_version < self.metadata.format_version { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade FormatVersion from {} to {}", + self.metadata.format_version, format_version + ), + )); + } + + if format_version != self.metadata.format_version { + self.metadata.format_version = format_version; + self.changes + .push(TableUpdate::UpgradeFormatVersion { format_version }); + } + + Ok(self) + } + + /// Set properties. If a property already exists, it will be overwritten. + /// + /// If a reserved property is set, the corresponding action is performed and the property is not persisted. + /// Currently the following reserved properties are supported: + /// * format-version: Set the format version of the table. + /// + /// # Errors + /// - If format-version property is set to a lower version than the current format version. + pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> { + // List of specified properties that are RESERVED and should not be persisted. + let reserved_properties = properties + .keys() + .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str())) + .map(ToString::to_string) + .collect::<Vec<_>>(); + + if !reserved_properties.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table properties should not contain reserved properties, but got: [{}]", + reserved_properties.join(", ") + ), + )); + } + + if properties.is_empty() { + return Ok(self); + } + + self.metadata.properties.extend(properties.clone()); + self.changes.push(TableUpdate::SetProperties { + updates: properties, + }); + + Ok(self) + } + + /// Remove properties from the table metadata. + /// Does nothing if the key is not present. + pub fn remove_properties(mut self, properties: &[String]) -> Self { + for property in properties { + self.metadata.properties.remove(property); + } + + if !properties.is_empty() { + self.changes.push(TableUpdate::RemoveProperties { + removals: properties.to_vec(), + }); + } + + self + } + + /// Set the location of the table metadata, stripping any trailing slashes. + pub fn set_location(mut self, location: String) -> Self { + let location = location.trim_end_matches('/').to_string(); + if self.metadata.location != location { + self.changes.push(TableUpdate::SetLocation { + location: location.clone(), + }); + self.metadata.location = location; + } + + self + } + + /// Add a snapshot to the table metadata. + /// + /// # Errors + /// - No schema has been added to the table metadata. + /// - No partition spec has been added to the table metadata. + /// - No sort order has been added to the table metadata. + /// - Snapshot id already exists. + /// - For format version > 1: the sequence number of the snapshot is loser than the highest sequence number specified so far. + pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> { + if self.metadata.partition_specs.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Attempting to add a snapshot before a partition spec is added", + )); + } + + if self.metadata.sort_orders.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Attempting to add a snapshot before a sort order is added", + )); + } + + if self + .metadata + .snapshots + .contains_key(&snapshot.snapshot_id()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()), + )); + } + + if self.metadata.format_version != FormatVersion::V1 + && snapshot.sequence_number() <= self.metadata.last_sequence_number + && snapshot.parent_snapshot_id().is_some() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot.sequence_number(), + self.metadata.last_sequence_number + ) + )); + } + + if let Some(last) = self.metadata.snapshot_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last snapshot timestamp {}", + snapshot.timestamp_ms(), + last.timestamp_ms + ), + )); + } + } + + if snapshot.timestamp_ms() - self.metadata.last_updated_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last updated timestamp {}", + snapshot.timestamp_ms(), + self.metadata.last_updated_ms + ), + )); + } + + // Mutation happens in next line - must be infallible from here + self.changes.push(TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }); + + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + self.metadata.last_sequence_number = snapshot.sequence_number(); + self.metadata + .snapshots + .insert(snapshot.snapshot_id(), snapshot.into()); + + Ok(self) + } + + /// Append a snapshot to the specified branch. + /// If branch is not specified, the snapshot is appended to the main branch. + /// The `ref` must already exist. Retention settings from the `ref` are re-used. + /// + /// # Errors + /// - The ref is unknown. + /// - Any of the preconditions of `self.add_snapshot` are not met. + pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) -> Result<Self> { + let ref_name = ref_name.unwrap_or(MAIN_BRANCH); + let mut reference = self + .metadata + .refs + .get(ref_name) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot append snapshot to unknown ref: '{}'", ref_name), + ) + })? + .clone(); + + reference.snapshot_id = snapshot.snapshot_id(); + + self.add_snapshot(snapshot)?.set_ref(ref_name, reference) + } + + /// Remove snapshots by its ids from the table metadata. + /// Does nothing if a snapshot id is not present. + /// Keeps as changes only the snapshots that were actually removed. + pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self { + let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len()); + + self.metadata.snapshots.retain(|k, _| { + if snapshot_ids.contains(k) { + removed_snapshots.push(*k); + false + } else { + true + } + }); + + if !removed_snapshots.is_empty() { + self.changes.push(TableUpdate::RemoveSnapshots { + snapshot_ids: removed_snapshots, + }); + } + + // Remove refs that are no longer valid + self.metadata + .refs + .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id)); + + self + } + + /// Set a reference to a snapshot. + /// + /// # Errors + /// - The snapshot id is unknown. + pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> { + if self + .metadata + .refs + .get(ref_name) + .is_some_and(|snap_ref| snap_ref.eq(&reference)) + { + return Ok(self); + } + + let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set '{ref_name}' to unknown snapshot: '{}'", + reference.snapshot_id + ), + )); + }; + + // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit + let is_added_snapshot = self.changes.iter().any(|update| { + matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id()) + }); + if is_added_snapshot { + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + } + + // Current snapshot id is set only for the main branch + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(snapshot.snapshot_id()); + if self.metadata.last_updated_ms == i64::default() { + self.metadata.last_updated_ms = chrono::Utc::now().timestamp_millis(); + }; + + self.metadata.snapshot_log.push(SnapshotLog { + snapshot_id: snapshot.snapshot_id(), + timestamp_ms: self.metadata.last_updated_ms, + }); + } + + self.changes.push(TableUpdate::SetSnapshotRef { + ref_name: ref_name.to_string(), + reference: reference.clone(), + }); + self.metadata.refs.insert(ref_name.to_string(), reference); + + Ok(self) + } + + /// Remove a reference + /// + /// If `ref_name='main'` the current snapshot id is set to -1. + pub fn remove_ref(mut self, ref_name: &str) -> Self { + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(i64::from(Self::LAST_ADDED)); + self.metadata.snapshot_log.clear(); + } + + if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH { + self.changes.push(TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.to_string(), + }); + } + + self + } + + /// Add a schema to the table metadata. + /// + /// The provided `schema.schema_id` may not be used. + + // ToDo Discuss: Should we add `new_last_column_id` argument? + // TLDR; I believe not as it acts as an assertion and its purpose (and source) is not clear. We shouldn't add it. + // + // Schemas can contain only old columns or a mix of old and new columns. + // In Java, if `new_last_column_id` set but too low, the function would fail, basically hinting at + // at the schema having been built for an older metadata version. `new_last_column_id` is typically obtained + // in the schema building process. + // + // This assertion is not required if the user controls the flow - he knows for which + // metadata he created a schema. If asserting the `new_last_column_id` was semantically important, it should be part of the schema and + // not be passed around alongside it. + // + // Specifying `new_last_column_id` in java also allows to set `metadata.last_column_id` to any arbitrary value + // even if its not present as a column. I believe this to be undesired behavior. This is not possible with the current Rust interface. + // + // If the schema is built out of sync with the TableMetadata, for example in a REST Catalog setting, the assertion of + // the provided `last_column_id` as part of the `TableUpdate::AddSchema` is still done in its `.apply` method. + pub fn add_schema(mut self, schema: Schema) -> Self { + // fn returns a result because I think we should check field-id <-> type compatibility if the field-id + // is still present in the metadata. This is not done in the Java code. + let new_schema_id = self.reuse_or_create_new_schema_id(&schema); + let schema_found = self.metadata.schemas.contains_key(&new_schema_id); + + if schema_found { + // ToDo Discuss: The Java code is a bit convoluted and I think it might be wrong for an edge case. + // Why is it wrong: The baseline is, that if something changes the state of the builder, it has an effect on it and + // must be recorded in the changes. + // The Java code might or might not change `lastAddedSchemaId`, and does not record this change in `changes`. + // Thus, replaying the changes, would lead to a different result if a schema is added twice in unfavorable + // conditions. + // Here we do it differently, but a check from a Java maintainer would be nice. + if self.last_added_schema_id != Some(new_schema_id) { + self.changes.push(TableUpdate::AddSchema { + last_column_id: Some(self.metadata.last_column_id), + schema: schema.clone(), + }); + self.last_added_schema_id = Some(new_schema_id); + } + + return self; + } + + // New schemas might contain only old columns. In this case last_column_id should not be + // reduced. + self.metadata.last_column_id = + std::cmp::max(self.metadata.last_column_id, schema.highest_field_id()); + + // Set schema-id + let schema = match new_schema_id == schema.schema_id() { + true => schema, + false => schema.with_schema_id(new_schema_id), + }; + + self.metadata + .schemas + .insert(new_schema_id, schema.clone().into()); + + self.changes.push(TableUpdate::AddSchema { + schema, + last_column_id: Some(self.metadata.last_column_id), + }); + + self.last_added_schema_id = Some(new_schema_id); + + self + } + + /// Set the current schema id. + /// + /// Errors: + /// - provided `schema_id` is -1 but no schema has been added via `add_schema`. + /// - No schema with the provided `schema_id` exists. + pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> { + if schema_id == Self::LAST_ADDED { + schema_id = self.last_added_schema_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set current schema to last added schema: no schema has been added.", + ) + })?; + }; + let schema_id = schema_id; // Make immutable + + if schema_id == self.metadata.current_schema_id { + return Ok(self); + } + + let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set current schema to unknown schema with id: '{}'", + schema_id + ), + ) + })?; + + // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema, + // so that older metadata can still be interpreted. + // Default partition spec and sort order are checked in the build() method + // which allows other default partition specs and sort orders to be set before the build. + + self.metadata.current_schema_id = schema_id; + + if self.last_added_schema_id == Some(schema_id) { + self.changes.push(TableUpdate::SetCurrentSchema { + schema_id: Self::LAST_ADDED, + }); + } else { + self.changes + .push(TableUpdate::SetCurrentSchema { schema_id }); + } + + Ok(self) + } + + /// Add a schema and set it as the current schema. + pub fn add_current_schema(self, schema: Schema) -> Result<Self> { + self.add_schema(schema).set_current_schema(Self::LAST_ADDED) + } + + /// Add a partition spec to the table metadata. + /// + /// The spec is bound eagerly to the current schema. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used. + /// + /// # Errors + /// - The partition spec cannot be bound to the current schema. + /// - The partition spec has non-sequential field ids and the table format version is 1. + pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> { + let schema = self.get_current_schema()?.clone(); + let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? + .with_last_assigned_field_id(self.metadata.last_partition_id) + .build()?; + + let new_spec_id = self.reuse_or_create_new_spec_id(&spec); + let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id); + let spec = spec.with_spec_id(new_spec_id); + let unbound_spec = unbound_spec.with_spec_id(new_spec_id); + + if spec_found { + if self.last_added_spec_id != Some(new_spec_id) { + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + self.last_added_spec_id = Some(new_spec_id); + } + + return Ok(self); + } + + if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot add partition spec with non-sequential field ids to format version 1 table", + )); + } + + let highest_field_id = spec + .highest_field_id() + .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID); + self.metadata + .partition_specs + .insert(new_spec_id, Arc::new(spec.into())); + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + + self.last_added_spec_id = Some(new_spec_id); + self.metadata.last_partition_id = + std::cmp::max(self.metadata.last_partition_id, highest_field_id); + + Ok(self) + } + + /// Set the default partition spec. + /// + /// # Errors + /// - spec_id is -1 but no spec has been added via `add_partition_spec`. + /// - No partition spec with the provided `spec_id` exists. + pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> { + if spec_id == Self::LAST_ADDED { + spec_id = self.last_added_spec_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set default partition spec to last added spec: no spec has been added.", + ) + })?; + } + + if self.metadata.default_spec.spec_id() == spec_id { + return Ok(self); + } + + if !self.metadata.partition_specs.contains_key(&spec_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",), + )); + } + + let schemaless_spec = + self.metadata + .partition_specs + .get(&spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",), + ) + })? + .clone(); + let spec = + Arc::unwrap_or_clone(schemaless_spec).bind(self.get_current_schema()?.clone())?; + self.metadata.default_spec = Arc::new(spec); + + if self.last_added_spec_id == Some(spec_id) { + self.changes.push(TableUpdate::SetDefaultSpec { + spec_id: Self::LAST_ADDED, + }); + } else { + self.changes.push(TableUpdate::SetDefaultSpec { spec_id }); + } + + Ok(self) + } + + /// Add a partition spec and set it as the default + pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> { + self.add_partition_spec(unbound_spec)? + .set_default_partition_spec(Self::LAST_ADDED) + } + + /// Add a sort order to the table metadata. + /// + /// The spec is bound eagerly to the current schema and must be valid for it. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `sort_order.order_id` is provided, it may not be used. + /// + /// # Errors + /// - Sort order id to add already exists. + /// - Sort order is incompatible with the current schema. + pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> { + let new_order_id = self.reuse_or_create_new_sort_id(&sort_order); + let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id); + + if sort_order_found { + if self.last_added_order_id != Some(new_order_id) { + self.changes.push(TableUpdate::AddSortOrder { + sort_order: sort_order.clone(), + }); + self.last_added_order_id = Some(new_order_id); + } + + return Ok(self); + } + + // ToDo Discuss: Java builds a fresh spec here: Review Comment: That is actually a very important point, and I think we should fix rust. Currently in rust we use the `SchemaBuilder` for everything. To get to it, we use an existing `Schema` and use the `into_builder(self)` method. I believe we should modify this to `into_builder(self, last_column_id: Option<i64>)` so that users are forced to think about the last_column_id. Most likely they will want to use the `last_column_id` of the metadata they got the `Schema` from. ########## crates/iceberg/src/spec/table_metadata_builder.rs: ########## @@ -0,0 +1,2074 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use uuid::Uuid; + +use super::{ + BoundPartitionSpec, FormatVersion, MetadataLog, PartitionSpecBuilder, Schema, SchemaRef, + Snapshot, SnapshotLog, SnapshotReference, SortOrder, SortOrderRef, TableMetadata, + UnboundPartitionSpec, DEFAULT_PARTITION_SPEC_ID, DEFAULT_SCHEMA_ID, MAIN_BRANCH, ONE_MINUTE_MS, + PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX, PROPERTY_METADATA_PREVIOUS_VERSIONS_MAX_DEFAULT, + RESERVED_PROPERTIES, UNPARTITIONED_LAST_ASSIGNED_ID, +}; +use crate::error::{Error, ErrorKind, Result}; +use crate::{TableCreation, TableUpdate}; + +const FIRST_FIELD_ID: u32 = 1; + +/// Manipulating table metadata. +/// +/// For this builder the order of called functions matters. Functions are applied in-order. +/// All operations applied to the `TableMetadata` are tracked in `changes` as a chronologically +/// ordered vec of `TableUpdate`. +/// If an operation does not lead to a change of the `TableMetadata`, the corresponding update +/// is omitted from `changes`. +/// +/// Unlike a typical builder pattern, the order of function calls matters. +/// Some basic rules: +/// - `add_schema` must be called before `set_current_schema`. +/// - If a new partition spec and schema are added, the schema should be added first. +#[derive(Debug, Clone)] +pub struct TableMetadataBuilder { + metadata: TableMetadata, + changes: Vec<TableUpdate>, + last_added_schema_id: Option<i32>, + last_added_spec_id: Option<i32>, + last_added_order_id: Option<i64>, + // None if this is a new table (from_metadata) method not used + previous_history_entry: Option<MetadataLog>, +} + +#[derive(Debug, Clone, PartialEq)] +/// Result of modifying or creating a `TableMetadata`. +pub struct TableMetadataBuildResult { + /// The new `TableMetadata`. + pub metadata: TableMetadata, + /// The changes that were applied to the metadata. + pub changes: Vec<TableUpdate>, + /// Expired metadata logs + pub expired_metadata_logs: Vec<MetadataLog>, +} + +impl TableMetadataBuilder { + const LAST_ADDED: i32 = -1; + + /// Create a `TableMetadata` object from scratch. + /// + /// This method re-assign ids of fields in the schema, schema.id, sort_order.id and + /// spec.id. It should only be used to create new table metadata from scratch. + pub fn new( + schema: Schema, + spec: impl Into<UnboundPartitionSpec>, + sort_order: SortOrder, + location: String, + format_version: FormatVersion, + properties: HashMap<String, String>, + ) -> Result<Self> { + // Re-assign field_ids, schema.id, sort_order.id and spec.id for a new table. + let (fresh_schema, fresh_spec, fresh_sort_order) = + Self::reassign_ids(schema, spec.into(), sort_order)?; + let schema_id = fresh_schema.schema_id(); + + let builder = Self { + metadata: TableMetadata { + format_version, + table_uuid: Uuid::now_v7(), + location: "".to_string(), // Overwritten immediately by set_location + last_sequence_number: 0, + last_updated_ms: 0, // Overwritten by build() if not set before + last_column_id: -1, // Overwritten immediately by add_current_schema + current_schema_id: -1, // Overwritten immediately by add_current_schema + schemas: HashMap::new(), + partition_specs: HashMap::new(), + default_spec: Arc::new( + BoundPartitionSpec::unpartition_spec(fresh_schema.clone()).with_spec_id(-1), + ), // Overwritten immediately by add_default_partition_spec + last_partition_id: UNPARTITIONED_LAST_ASSIGNED_ID, + properties: HashMap::new(), + current_snapshot_id: None, + snapshots: HashMap::new(), + snapshot_log: vec![], + sort_orders: HashMap::new(), + metadata_log: vec![], + default_sort_order_id: -1, // Overwritten immediately by add_default_sort_order + refs: HashMap::default(), + }, + changes: vec![], + last_added_schema_id: Some(schema_id), + last_added_spec_id: None, + last_added_order_id: None, + previous_history_entry: None, + }; + + builder + .set_location(location) + .add_current_schema(fresh_schema)? + .add_default_partition_spec(fresh_spec.into_unbound())? + .add_default_sort_order(fresh_sort_order)? + .set_properties(properties) + } + + /// Creates a new table metadata builder from the given metadata to modify it. + + /// `current_file_location` is the location where the current version + /// of the metadata file is stored. This is used to update the metadata log. + /// If `current_file_location` is `None`, the metadata log will not be updated. + /// This should only be used to stage-create tables. + #[must_use] + pub fn new_from_metadata( + previous: TableMetadata, + previous_file_location: Option<String>, + ) -> Self { + Self { + previous_history_entry: previous_file_location.map(|l| MetadataLog { + metadata_file: l, + timestamp_ms: previous.last_updated_ms, + }), + metadata: previous, + changes: Vec::default(), + last_added_schema_id: None, + last_added_spec_id: None, + last_added_order_id: None, + } + } + + /// Creates a new table metadata builder from the given table creation. + pub fn from_table_creation(table_creation: TableCreation) -> Result<Self> { + let TableCreation { + name: _, + location, + schema, + partition_spec, + sort_order, + properties, + } = table_creation; + + let location = location.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Can't create table without location", + ) + })?; + let partition_spec = partition_spec.unwrap_or(UnboundPartitionSpec { + spec_id: None, + fields: vec![], + }); + + Self::new( + schema, + partition_spec, + sort_order.unwrap_or(SortOrder::unsorted_order()), + location, + FormatVersion::V1, + properties, + ) + } + + /// Get the current schema with all changes applied up to this point. + #[inline] + pub fn current_schema(&self) -> &SchemaRef { + self.metadata.current_schema() + } + + /// Get the current last column id + #[inline] + pub fn last_column_id(&self) -> i32 { + self.metadata.last_column_id + } + + /// Get the current last updated timestamp + #[inline] + pub fn last_updated_ms(&self) -> i64 { + self.metadata.last_updated_ms + } + + /// Changes uuid of table metadata. + pub fn assign_uuid(mut self, uuid: Uuid) -> Self { + if self.metadata.table_uuid != uuid { + self.metadata.table_uuid = uuid; + self.changes.push(TableUpdate::AssignUuid { uuid }); + } + + self + } + + /// Upgrade `FormatVersion`. Downgrades are not allowed. + /// + /// # Errors + /// - Cannot downgrade to older format versions. + pub fn upgrade_format_version(mut self, format_version: FormatVersion) -> Result<Self> { + if format_version < self.metadata.format_version { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade FormatVersion from {} to {}", + self.metadata.format_version, format_version + ), + )); + } + + if format_version != self.metadata.format_version { + self.metadata.format_version = format_version; + self.changes + .push(TableUpdate::UpgradeFormatVersion { format_version }); + } + + Ok(self) + } + + /// Set properties. If a property already exists, it will be overwritten. + /// + /// If a reserved property is set, the corresponding action is performed and the property is not persisted. + /// Currently the following reserved properties are supported: + /// * format-version: Set the format version of the table. + /// + /// # Errors + /// - If format-version property is set to a lower version than the current format version. + pub fn set_properties(mut self, properties: HashMap<String, String>) -> Result<Self> { + // List of specified properties that are RESERVED and should not be persisted. + let reserved_properties = properties + .keys() + .filter(|key| RESERVED_PROPERTIES.contains(&key.as_str())) + .map(ToString::to_string) + .collect::<Vec<_>>(); + + if !reserved_properties.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Table properties should not contain reserved properties, but got: [{}]", + reserved_properties.join(", ") + ), + )); + } + + if properties.is_empty() { + return Ok(self); + } + + self.metadata.properties.extend(properties.clone()); + self.changes.push(TableUpdate::SetProperties { + updates: properties, + }); + + Ok(self) + } + + /// Remove properties from the table metadata. + /// Does nothing if the key is not present. + pub fn remove_properties(mut self, properties: &[String]) -> Self { + for property in properties { + self.metadata.properties.remove(property); + } + + if !properties.is_empty() { + self.changes.push(TableUpdate::RemoveProperties { + removals: properties.to_vec(), + }); + } + + self + } + + /// Set the location of the table metadata, stripping any trailing slashes. + pub fn set_location(mut self, location: String) -> Self { + let location = location.trim_end_matches('/').to_string(); + if self.metadata.location != location { + self.changes.push(TableUpdate::SetLocation { + location: location.clone(), + }); + self.metadata.location = location; + } + + self + } + + /// Add a snapshot to the table metadata. + /// + /// # Errors + /// - No schema has been added to the table metadata. + /// - No partition spec has been added to the table metadata. + /// - No sort order has been added to the table metadata. + /// - Snapshot id already exists. + /// - For format version > 1: the sequence number of the snapshot is loser than the highest sequence number specified so far. + pub fn add_snapshot(mut self, snapshot: Snapshot) -> Result<Self> { + if self.metadata.partition_specs.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Attempting to add a snapshot before a partition spec is added", + )); + } + + if self.metadata.sort_orders.is_empty() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Attempting to add a snapshot before a sort order is added", + )); + } + + if self + .metadata + .snapshots + .contains_key(&snapshot.snapshot_id()) + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Snapshot already exists for: '{}'", snapshot.snapshot_id()), + )); + } + + if self.metadata.format_version != FormatVersion::V1 + && snapshot.sequence_number() <= self.metadata.last_sequence_number + && snapshot.parent_snapshot_id().is_some() + { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add snapshot with sequence number {} older than last sequence number {}", + snapshot.sequence_number(), + self.metadata.last_sequence_number + ) + )); + } + + if let Some(last) = self.metadata.snapshot_log.last() { + // commits can happen concurrently from different machines. + // A tolerance helps us avoid failure for small clock skew + if snapshot.timestamp_ms() - last.timestamp_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last snapshot timestamp {}", + snapshot.timestamp_ms(), + last.timestamp_ms + ), + )); + } + } + + if snapshot.timestamp_ms() - self.metadata.last_updated_ms < -ONE_MINUTE_MS { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Invalid snapshot timestamp {}: before last updated timestamp {}", + snapshot.timestamp_ms(), + self.metadata.last_updated_ms + ), + )); + } + + // Mutation happens in next line - must be infallible from here + self.changes.push(TableUpdate::AddSnapshot { + snapshot: snapshot.clone(), + }); + + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + self.metadata.last_sequence_number = snapshot.sequence_number(); + self.metadata + .snapshots + .insert(snapshot.snapshot_id(), snapshot.into()); + + Ok(self) + } + + /// Append a snapshot to the specified branch. + /// If branch is not specified, the snapshot is appended to the main branch. + /// The `ref` must already exist. Retention settings from the `ref` are re-used. + /// + /// # Errors + /// - The ref is unknown. + /// - Any of the preconditions of `self.add_snapshot` are not met. + pub fn append_snapshot(self, snapshot: Snapshot, ref_name: Option<&str>) -> Result<Self> { + let ref_name = ref_name.unwrap_or(MAIN_BRANCH); + let mut reference = self + .metadata + .refs + .get(ref_name) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot append snapshot to unknown ref: '{}'", ref_name), + ) + })? + .clone(); + + reference.snapshot_id = snapshot.snapshot_id(); + + self.add_snapshot(snapshot)?.set_ref(ref_name, reference) + } + + /// Remove snapshots by its ids from the table metadata. + /// Does nothing if a snapshot id is not present. + /// Keeps as changes only the snapshots that were actually removed. + pub fn remove_snapshots(mut self, snapshot_ids: &[i64]) -> Self { + let mut removed_snapshots = Vec::with_capacity(snapshot_ids.len()); + + self.metadata.snapshots.retain(|k, _| { + if snapshot_ids.contains(k) { + removed_snapshots.push(*k); + false + } else { + true + } + }); + + if !removed_snapshots.is_empty() { + self.changes.push(TableUpdate::RemoveSnapshots { + snapshot_ids: removed_snapshots, + }); + } + + // Remove refs that are no longer valid + self.metadata + .refs + .retain(|_, v| self.metadata.snapshots.contains_key(&v.snapshot_id)); + + self + } + + /// Set a reference to a snapshot. + /// + /// # Errors + /// - The snapshot id is unknown. + pub fn set_ref(mut self, ref_name: &str, reference: SnapshotReference) -> Result<Self> { + if self + .metadata + .refs + .get(ref_name) + .is_some_and(|snap_ref| snap_ref.eq(&reference)) + { + return Ok(self); + } + + let Some(snapshot) = self.metadata.snapshots.get(&reference.snapshot_id) else { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set '{ref_name}' to unknown snapshot: '{}'", + reference.snapshot_id + ), + )); + }; + + // Update last_updated_ms to the exact timestamp of the snapshot if it was added in this commit + let is_added_snapshot = self.changes.iter().any(|update| { + matches!(update, TableUpdate::AddSnapshot { snapshot: snap } if snap.snapshot_id() == snapshot.snapshot_id()) + }); + if is_added_snapshot { + self.metadata.last_updated_ms = snapshot.timestamp_ms(); + } + + // Current snapshot id is set only for the main branch + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(snapshot.snapshot_id()); + if self.metadata.last_updated_ms == i64::default() { + self.metadata.last_updated_ms = chrono::Utc::now().timestamp_millis(); + }; + + self.metadata.snapshot_log.push(SnapshotLog { + snapshot_id: snapshot.snapshot_id(), + timestamp_ms: self.metadata.last_updated_ms, + }); + } + + self.changes.push(TableUpdate::SetSnapshotRef { + ref_name: ref_name.to_string(), + reference: reference.clone(), + }); + self.metadata.refs.insert(ref_name.to_string(), reference); + + Ok(self) + } + + /// Remove a reference + /// + /// If `ref_name='main'` the current snapshot id is set to -1. + pub fn remove_ref(mut self, ref_name: &str) -> Self { + if ref_name == MAIN_BRANCH { + self.metadata.current_snapshot_id = Some(i64::from(Self::LAST_ADDED)); + self.metadata.snapshot_log.clear(); + } + + if self.metadata.refs.remove(ref_name).is_some() || ref_name == MAIN_BRANCH { + self.changes.push(TableUpdate::RemoveSnapshotRef { + ref_name: ref_name.to_string(), + }); + } + + self + } + + /// Add a schema to the table metadata. + /// + /// The provided `schema.schema_id` may not be used. + + // ToDo Discuss: Should we add `new_last_column_id` argument? + // TLDR; I believe not as it acts as an assertion and its purpose (and source) is not clear. We shouldn't add it. + // + // Schemas can contain only old columns or a mix of old and new columns. + // In Java, if `new_last_column_id` set but too low, the function would fail, basically hinting at + // at the schema having been built for an older metadata version. `new_last_column_id` is typically obtained + // in the schema building process. + // + // This assertion is not required if the user controls the flow - he knows for which + // metadata he created a schema. If asserting the `new_last_column_id` was semantically important, it should be part of the schema and + // not be passed around alongside it. + // + // Specifying `new_last_column_id` in java also allows to set `metadata.last_column_id` to any arbitrary value + // even if its not present as a column. I believe this to be undesired behavior. This is not possible with the current Rust interface. + // + // If the schema is built out of sync with the TableMetadata, for example in a REST Catalog setting, the assertion of + // the provided `last_column_id` as part of the `TableUpdate::AddSchema` is still done in its `.apply` method. + pub fn add_schema(mut self, schema: Schema) -> Self { + // fn returns a result because I think we should check field-id <-> type compatibility if the field-id + // is still present in the metadata. This is not done in the Java code. + let new_schema_id = self.reuse_or_create_new_schema_id(&schema); + let schema_found = self.metadata.schemas.contains_key(&new_schema_id); + + if schema_found { + // ToDo Discuss: The Java code is a bit convoluted and I think it might be wrong for an edge case. + // Why is it wrong: The baseline is, that if something changes the state of the builder, it has an effect on it and + // must be recorded in the changes. + // The Java code might or might not change `lastAddedSchemaId`, and does not record this change in `changes`. + // Thus, replaying the changes, would lead to a different result if a schema is added twice in unfavorable + // conditions. + // Here we do it differently, but a check from a Java maintainer would be nice. + if self.last_added_schema_id != Some(new_schema_id) { + self.changes.push(TableUpdate::AddSchema { + last_column_id: Some(self.metadata.last_column_id), + schema: schema.clone(), + }); + self.last_added_schema_id = Some(new_schema_id); + } + + return self; + } + + // New schemas might contain only old columns. In this case last_column_id should not be + // reduced. + self.metadata.last_column_id = + std::cmp::max(self.metadata.last_column_id, schema.highest_field_id()); + + // Set schema-id + let schema = match new_schema_id == schema.schema_id() { + true => schema, + false => schema.with_schema_id(new_schema_id), + }; + + self.metadata + .schemas + .insert(new_schema_id, schema.clone().into()); + + self.changes.push(TableUpdate::AddSchema { + schema, + last_column_id: Some(self.metadata.last_column_id), + }); + + self.last_added_schema_id = Some(new_schema_id); + + self + } + + /// Set the current schema id. + /// + /// Errors: + /// - provided `schema_id` is -1 but no schema has been added via `add_schema`. + /// - No schema with the provided `schema_id` exists. + pub fn set_current_schema(mut self, mut schema_id: i32) -> Result<Self> { + if schema_id == Self::LAST_ADDED { + schema_id = self.last_added_schema_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set current schema to last added schema: no schema has been added.", + ) + })?; + }; + let schema_id = schema_id; // Make immutable + + if schema_id == self.metadata.current_schema_id { + return Ok(self); + } + + let _schema = self.metadata.schemas.get(&schema_id).ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot set current schema to unknown schema with id: '{}'", + schema_id + ), + ) + })?; + + // Old partition specs and sort-orders should be preserved even if they are not compatible with the new schema, + // so that older metadata can still be interpreted. + // Default partition spec and sort order are checked in the build() method + // which allows other default partition specs and sort orders to be set before the build. + + self.metadata.current_schema_id = schema_id; + + if self.last_added_schema_id == Some(schema_id) { + self.changes.push(TableUpdate::SetCurrentSchema { + schema_id: Self::LAST_ADDED, + }); + } else { + self.changes + .push(TableUpdate::SetCurrentSchema { schema_id }); + } + + Ok(self) + } + + /// Add a schema and set it as the current schema. + pub fn add_current_schema(self, schema: Schema) -> Result<Self> { + self.add_schema(schema).set_current_schema(Self::LAST_ADDED) + } + + /// Add a partition spec to the table metadata. + /// + /// The spec is bound eagerly to the current schema. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `unbound_spec.spec_id` is provided as `Some`, it may not be used. + /// + /// # Errors + /// - The partition spec cannot be bound to the current schema. + /// - The partition spec has non-sequential field ids and the table format version is 1. + pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result<Self> { + let schema = self.get_current_schema()?.clone(); + let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? + .with_last_assigned_field_id(self.metadata.last_partition_id) + .build()?; + + let new_spec_id = self.reuse_or_create_new_spec_id(&spec); + let spec_found = self.metadata.partition_specs.contains_key(&new_spec_id); + let spec = spec.with_spec_id(new_spec_id); + let unbound_spec = unbound_spec.with_spec_id(new_spec_id); + + if spec_found { + if self.last_added_spec_id != Some(new_spec_id) { + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + self.last_added_spec_id = Some(new_spec_id); + } + + return Ok(self); + } + + if self.metadata.format_version <= FormatVersion::V1 && !spec.has_sequential_ids() { + return Err(Error::new( + ErrorKind::DataInvalid, + "Cannot add partition spec with non-sequential field ids to format version 1 table", + )); + } + + let highest_field_id = spec + .highest_field_id() + .unwrap_or(UNPARTITIONED_LAST_ASSIGNED_ID); + self.metadata + .partition_specs + .insert(new_spec_id, Arc::new(spec.into())); + self.changes + .push(TableUpdate::AddSpec { spec: unbound_spec }); + + self.last_added_spec_id = Some(new_spec_id); + self.metadata.last_partition_id = + std::cmp::max(self.metadata.last_partition_id, highest_field_id); + + Ok(self) + } + + /// Set the default partition spec. + /// + /// # Errors + /// - spec_id is -1 but no spec has been added via `add_partition_spec`. + /// - No partition spec with the provided `spec_id` exists. + pub fn set_default_partition_spec(mut self, mut spec_id: i32) -> Result<Self> { + if spec_id == Self::LAST_ADDED { + spec_id = self.last_added_spec_id.ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + "Cannot set default partition spec to last added spec: no spec has been added.", + ) + })?; + } + + if self.metadata.default_spec.spec_id() == spec_id { + return Ok(self); + } + + if !self.metadata.partition_specs.contains_key(&spec_id) { + return Err(Error::new( + ErrorKind::DataInvalid, + format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",), + )); + } + + let schemaless_spec = + self.metadata + .partition_specs + .get(&spec_id) + .ok_or_else(|| { + Error::new( + ErrorKind::DataInvalid, + format!("Cannot set default partition spec to unknown spec with id: '{spec_id}'",), + ) + })? + .clone(); + let spec = + Arc::unwrap_or_clone(schemaless_spec).bind(self.get_current_schema()?.clone())?; + self.metadata.default_spec = Arc::new(spec); + + if self.last_added_spec_id == Some(spec_id) { + self.changes.push(TableUpdate::SetDefaultSpec { + spec_id: Self::LAST_ADDED, + }); + } else { + self.changes.push(TableUpdate::SetDefaultSpec { spec_id }); + } + + Ok(self) + } + + /// Add a partition spec and set it as the default + pub fn add_default_partition_spec(self, unbound_spec: UnboundPartitionSpec) -> Result<Self> { + self.add_partition_spec(unbound_spec)? + .set_default_partition_spec(Self::LAST_ADDED) + } + + /// Add a sort order to the table metadata. + /// + /// The spec is bound eagerly to the current schema and must be valid for it. + /// If a schema is added in the same set of changes, the schema should be added first. + /// + /// Even if `sort_order.order_id` is provided, it may not be used. + /// + /// # Errors + /// - Sort order id to add already exists. + /// - Sort order is incompatible with the current schema. + pub fn add_sort_order(mut self, sort_order: SortOrder) -> Result<Self> { + let new_order_id = self.reuse_or_create_new_sort_id(&sort_order); + let sort_order_found = self.metadata.sort_orders.contains_key(&new_order_id); + + if sort_order_found { + if self.last_added_order_id != Some(new_order_id) { + self.changes.push(TableUpdate::AddSortOrder { + sort_order: sort_order.clone(), + }); + self.last_added_order_id = Some(new_order_id); + } + + return Ok(self); + } + + // ToDo Discuss: Java builds a fresh spec here: Review Comment: CC @liurenjie1024 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org