liurenjie1024 commented on code in PR #1433: URL: https://github.com/apache/iceberg-rust/pull/1433#discussion_r2139712392
########## crates/iceberg/src/transaction/mod.rs: ########## @@ -178,27 +164,48 @@ impl Transaction { } } - /// Remove properties in table. - pub fn remove_properties(mut self, keys: Vec<String>) -> Result<Self> { - self.apply( - vec![TableUpdate::RemoveProperties { removals: keys }], - vec![], - )?; - Ok(self) - } - /// Set the location of table - pub fn set_location(mut self, location: String) -> Result<Self> { - self.apply(vec![TableUpdate::SetLocation { location }], vec![])?; - Ok(self) + pub fn update_location(&self) -> UpdateLocationAction { + UpdateLocationAction::new() } /// Commit transaction. - pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> { + pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> { + if self.actions.is_empty() && self.updates.is_empty() { + // nothing to commit + return Ok(self.base_table.clone()); + } + + self.do_commit(catalog).await + } + + async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> { + let base_table_identifier = self.base_table.identifier().to_owned(); + + let refreshed = catalog.load_table(&base_table_identifier.clone()).await?; + + if self.base_table.metadata() != refreshed.metadata() + || self.base_table.metadata_location() != refreshed.metadata_location() + { + // current base is stale, use refreshed as base and re-apply transaction actions + self.base_table = refreshed.clone(); + } + + let current_table = self.base_table.clone(); + + for action in self.actions.clone() { Review Comment: ```suggestion for action in self.actions { ``` You don't need to clone whole collection, just clone each action, this saves a vec allocation. ########## crates/iceberg/src/transaction/upgrade_format_version.rs: ########## @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::cmp::Ordering; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::TableUpdate::UpgradeFormatVersion; +use crate::spec::FormatVersion; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableUpdate}; + +/// A transaction action to upgrade a table's format version. +/// +/// This action is used within a transaction to indicate that the +/// table's format version should be upgraded to a specified version. +/// The location remains optional until explicitly set via [`set_format_version`]. +pub struct UpgradeFormatVersionAction { + format_version: Option<FormatVersion>, +} + +impl UpgradeFormatVersionAction { + /// Creates a new `UpgradeFormatVersionAction` with no version set. + pub fn new() -> Self { + UpgradeFormatVersionAction { + format_version: None, + } + } + + /// Sets the target format version for the upgrade. + /// + /// # Arguments + /// + /// * `format_version` - The version to upgrade the table format to. + /// + /// # Returns + /// + /// Returns the updated `UpgradeFormatVersionAction` with the format version set. + pub fn set_format_version(mut self, format_version: FormatVersion) -> Self { + self.format_version = Some(format_version); + self + } +} + +impl Default for UpgradeFormatVersionAction { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl TransactionAction for UpgradeFormatVersionAction { + fn as_any(self: Arc<Self>) -> Arc<dyn Any> { + self + } + + async fn commit(self: Arc<Self>, table: &Table) -> Result<ActionCommit> { + let current_version = table.metadata().format_version(); + let updates: Vec<TableUpdate>; + + if let Some(format_version) = self.format_version { + match current_version.cmp(&format_version) { + Ordering::Greater => { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot downgrade table version from {} to {}", + current_version, format_version + ), + )); + } + Ordering::Less => { + updates = vec![UpgradeFormatVersion { format_version }]; + } + Ordering::Equal => { + // do nothing + updates = vec![]; + } + } + } else { + // error + return Err(Error::new( + ErrorKind::DataInvalid, + "FormatVersion is not set for UpgradeFormatVersionAction!", + )); + } Review Comment: We don't need this, this check is already done here: https://github.com/apache/iceberg-rust/blob/3ea6f4791a2a4e15c4466cff7ccc478c08e2d3e3/crates/iceberg/src/spec/table_metadata_builder.rs#L211 ########## crates/iceberg/src/transaction/update_properties.rs: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Result, TableUpdate}; + +/// A transactional action that updates or removes table properties +/// +/// This action is used to modify key-value pairs in a table's metadata +/// properties during a transaction. It supports setting new values for existing keys +/// or adding new keys, as well as removing existing keys. Each key can only be updated +/// or removed in a single action, not both. +pub struct UpdatePropertiesAction { + updates: HashMap<String, String>, + removals: HashSet<String>, +} + +impl UpdatePropertiesAction { + /// Creates a new [`UpdatePropertiesAction`] with no updates or removals. + pub fn new() -> Self { + UpdatePropertiesAction { + updates: HashMap::default(), + removals: HashSet::default(), + } + } + + /// Adds a key-value pair to the update set of this action. + /// + /// # Panics + /// + /// Panics if the key was previously marked for removal. + /// + /// # Arguments + /// + /// * `key` - The property key to update. + /// * `value` - The new value to associate with the key. + /// + /// # Returns + /// + /// The updated [`UpdatePropertiesAction`] with the key-value pair added to the update set. + pub fn set(mut self, key: String, value: String) -> Self { + assert!(!self.removals.contains(&key)); Review Comment: We should not use assert here since this argument is user passed argument. We could do this check in `apply` ########## crates/iceberg/src/transaction/upgrade_format_version.rs: ########## @@ -0,0 +1,133 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::cmp::Ordering; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::TableUpdate::UpgradeFormatVersion; +use crate::spec::FormatVersion; +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Error, ErrorKind, Result, TableUpdate}; + +/// A transaction action to upgrade a table's format version. +/// +/// This action is used within a transaction to indicate that the +/// table's format version should be upgraded to a specified version. +/// The location remains optional until explicitly set via [`set_format_version`]. +pub struct UpgradeFormatVersionAction { Review Comment: This maybe unnecessary, we could put it in `UpdateProperty`, see what's did in java: https://github.com/apache/iceberg/blob/afda8be25652d44d9339a79c6797b6bf20c55bd6/core/src/main/java/org/apache/iceberg/PropertiesUpdate.java#L73 ########## crates/iceberg/src/transaction/action.rs: ########## @@ -35,6 +37,9 @@ pub type BoxedTransactionAction = Arc<dyn TransactionAction>; /// to modify the table metadata. #[async_trait] pub(crate) trait TransactionAction: Sync + Send { + /// Returns the action as [`Any`] so it can be downcast to concrete types later + fn as_any(self: Arc<Self>) -> Arc<dyn Any>; Review Comment: ```suggestion fn as_any(&self) -> &Any { self } ``` ########## crates/iceberg/src/transaction/mod.rs: ########## @@ -104,32 +109,13 @@ impl Transaction { } /// Sets table to a new version. - pub fn upgrade_table_version(mut self, format_version: FormatVersion) -> Result<Self> { - let current_version = self.current_table.metadata().format_version(); - match current_version.cmp(&format_version) { - Ordering::Greater => { - return Err(Error::new( - ErrorKind::DataInvalid, - format!( - "Cannot downgrade table version from {} to {}", - current_version, format_version - ), - )); - } - Ordering::Less => { - self.apply(vec![UpgradeFormatVersion { format_version }], vec![])?; - } - Ordering::Equal => { - // Do nothing. - } - } - Ok(self) + pub fn upgrade_table_version(&self) -> UpgradeFormatVersionAction { + UpgradeFormatVersionAction::new() } /// Update table's property. - pub fn set_properties(mut self, props: HashMap<String, String>) -> Result<Self> { - self.apply(vec![TableUpdate::SetProperties { updates: props }], vec![])?; - Ok(self) + pub fn update_properties(&self) -> UpdatePropertiesAction { Review Comment: ```suggestion pub fn update_table_properties(&self) -> UpdatePropertiesAction { ``` ########## crates/iceberg/src/transaction/update_properties.rs: ########## @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use async_trait::async_trait; + +use crate::table::Table; +use crate::transaction::action::{ActionCommit, TransactionAction}; +use crate::{Result, TableUpdate}; + +/// A transactional action that updates or removes table properties +/// +/// This action is used to modify key-value pairs in a table's metadata +/// properties during a transaction. It supports setting new values for existing keys +/// or adding new keys, as well as removing existing keys. Each key can only be updated +/// or removed in a single action, not both. +pub struct UpdatePropertiesAction { + updates: HashMap<String, String>, + removals: HashSet<String>, +} + +impl UpdatePropertiesAction { + /// Creates a new [`UpdatePropertiesAction`] with no updates or removals. + pub fn new() -> Self { + UpdatePropertiesAction { + updates: HashMap::default(), + removals: HashSet::default(), + } + } + + /// Adds a key-value pair to the update set of this action. + /// + /// # Panics + /// + /// Panics if the key was previously marked for removal. + /// + /// # Arguments + /// + /// * `key` - The property key to update. + /// * `value` - The new value to associate with the key. + /// + /// # Returns + /// + /// The updated [`UpdatePropertiesAction`] with the key-value pair added to the update set. + pub fn set(mut self, key: String, value: String) -> Self { + assert!(!self.removals.contains(&key)); + self.updates.insert(key, value); + self + } + + /// Adds a key to the removal set of this action. + /// + /// # Panics + /// + /// Panics if the key was already marked for update. + /// + /// # Arguments + /// + /// * `key` - The property key to remove. + /// + /// # Returns + /// + /// The updated [`UpdatePropertiesAction`] with the key added to the removal set. + pub fn remove(mut self, key: String) -> Self { + assert!(!self.updates.contains_key(&key)); Review Comment: Ditto. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org