liurenjie1024 commented on code in PR #1400:
URL: https://github.com/apache/iceberg-rust/pull/1400#discussion_r2126155800


##########
crates/iceberg/src/transaction/action/mod.rs:
##########
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::mem::take;
+
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+pub type PendingAction = Box<dyn TransactionAction>;
+
+pub(crate) trait TransactionAction: Sync {
+    /// Commit the changes and apply the changes to the transaction,
+    /// return the transaction with the updated current_table
+    fn commit(self: Box<Self>) -> Result<TransactionActionCommit>;
+}
+
+pub struct TransactionActionCommit {
+    action: Option<PendingAction>,
+    updates: Vec<TableUpdate>,
+    requirements: Vec<TableRequirement>,
+}
+
+impl TransactionActionCommit {
+    pub fn take_action(&mut self) -> Option<PendingAction> {
+        take(&mut self.action)
+    }
+
+    pub fn take_updates(&mut self) -> Vec<TableUpdate> {
+        take(&mut self.updates)
+    }
+
+    pub fn take_requirements(&mut self) -> Vec<TableRequirement> {
+        take(&mut self.requirements)
+    }
+}
+
+pub struct SetLocation {
+    pub location: Option<String>,
+}
+
+impl SetLocation {
+    pub fn new() -> Self {
+        SetLocation { location: None }
+    }
+
+    pub fn set_location(mut self, location: String) -> Self {
+        self.location = Some(location);
+        self
+    }
+}
+

Review Comment:
   I think we still need two methods `apply` and `commit` since they have 
different use cases:
   
   1. `apply` is used to store itself to `Transaction`, and it should not be 
`async`. The case for `UpdateLocation` is too simple, that's why it seems 
useless here. Let's use `FastAppend` action as an example, its normal code path 
should look like:
   ```rust
   let mut tx = table.transaction();
   // This is first action.
   let mut tx = tx.fast_append()
   .append_datafile(...)
   .add_parquet(...)
   .apply();
   
   // This is second action, assume we have update schema action.
   let mut tx = tx.update_schema()
    .add_column("a", DataType.String)
    .apply();
   
   tx.commit()
   ```
   
   2. `commit` method of `TransactionAction` is used to do more complex things: 
validation, generating manifest files, and should return `TableUpdate` and 
`UpdateRequirement`. More importantly, this method is not supposed to be called 
by user, but called by this crate internally.



##########
crates/iceberg/src/transaction/mod.rs:
##########
@@ -25,31 +26,39 @@ use std::cmp::Ordering;
 use std::collections::HashMap;
 use std::mem::discriminant;
 use std::sync::Arc;
+use std::time::Duration;
 
+use backon::{BackoffBuilder, ExponentialBuilder, RetryableWithContext};
+// use tokio_retry2::strategy::ExponentialBackoff;
+// use tokio_retry2::{Retry, RetryError};
 use uuid::Uuid;
 
 use crate::TableUpdate::UpgradeFormatVersion;
 use crate::error::Result;
 use crate::spec::FormatVersion;
 use crate::table::Table;
+use crate::transaction::action::{BoxedTransactionAction, SetLocationAction, 
TransactionAction};
 use crate::transaction::append::FastAppendAction;
 use crate::transaction::sort_order::ReplaceSortOrderAction;
 use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, 
TableUpdate};
 
 /// Table transaction.
-pub struct Transaction<'a> {
-    base_table: &'a Table,
+#[derive(Clone)]
+pub struct Transaction {
+    base_table: Table,
     current_table: Table,
+    actions: Vec<BoxedTransactionAction>,
     updates: Vec<TableUpdate>,
     requirements: Vec<TableRequirement>,

Review Comment:
   We should not store `updates` and `requirements`



##########
crates/iceberg/src/transaction/action/mod.rs:
##########
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use crate::transaction::Transaction;
+use crate::{Error, ErrorKind, Result, TableRequirement, TableUpdate};
+
+pub type BoxedTransactionAction = Arc<dyn TransactionAction>;
+
+#[async_trait]
+pub(crate) trait TransactionAction: Sync + Send {
+    /// Commit the changes and apply the changes to the transaction,
+    /// return the transaction with the updated current_table
+    fn commit(self: Arc<Self>, tx: &mut Transaction) -> Result<()>;

Review Comment:
   ```suggestion
       async fn commit(self: Arc<Self>, table:  &Table) -> Result<ActionCommit>;
   ```
   `ActionCommit` should contains `Vec<UpdateRequirement>` and 
`Vec<TableUpdate>`



##########
crates/iceberg/src/transaction/mod.rs:
##########
@@ -185,19 +195,111 @@ impl<'a> Transaction<'a> {
 
     /// Set the location of table
     pub fn set_location(mut self, location: String) -> Result<Self> {
-        self.apply(vec![TableUpdate::SetLocation { location }], vec![])?;
+        let set_location = SetLocationAction::new().set_location(location);
+        Arc::new(set_location).commit(&mut self)?;
         Ok(self)
     }
 
     /// Commit transaction.
-    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
+    pub async fn commit(&mut self, catalog: Arc<&dyn Catalog>) -> 
Result<Table> {
+        if self.actions.is_empty()
+            || (self.base_table.metadata() == self.current_table.metadata()
+                && self.base_table.metadata_location() == 
self.current_table.metadata_location())
+        {
+            // nothing to commit
+            return Ok(self.current_table.clone());
+        }
+
+        // let retry_strategy = ExponentialBackoff::from_millis(10)
+        //     .take(3);    // limit to 3 retries
+
+        // Retry::spawn(retry_strategy,
+        //             || {
+        //             async {
+        //                 Transaction::do_commit(
+        //                     &mut self.clone(),
+        //                     catalog.clone()).await
+        //             }
+        //         }).await
+

Review Comment:
   I think it's fine as it's an internal detail, let's go on with `backon`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to