Fokko commented on code in PR #1534:
URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1944888949


##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1066,97 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            self: the target Iceberg table to execute the upsert on
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on. These are essentially analogous 
to primary keys
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+            Example Use Cases:
+                Case 1: Both Parameters = True (Full Upsert)
+                Existing row found → Update it
+                New row found → Insert it
+
+                Case 2: when_matched_update_all = False, 
when_not_matched_insert_all = True
+                Existing row found → Do nothing (no updates)
+                New row found → Insert it
+
+                Case 3: when_matched_update_all = True, 
when_not_matched_insert_all = False
+                Existing row found → Update it
+                New row found → Do nothing (no inserts)
+
+                Case 4: Both Parameters = False (No Merge Effect)
+                Existing row found → Do nothing
+                New row found → Do nothing
+                (Function effectively does nothing)
+
+
+        Returns: a UpsertResult class (contains details of rows updated and 
inserted)
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            raise Exception('no upsert options selected...exiting')

Review Comment:
   I think a `ValueError` might be more appropriate here. From the 
[docs](https://docs.python.org/3/library/exceptions.html#ValueError): Raised 
when an operation or function receives an argument that has the right type but 
an inappropriate value, and the situation is not described by a more precise 
exception such as 
[IndexError](https://docs.python.org/3/library/exceptions.html#IndexError).
   
   ```suggestion
               raise ValueError('no upsert options selected...exiting')
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1066,97 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            self: the target Iceberg table to execute the upsert on
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on. These are essentially analogous 
to primary keys
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+            Example Use Cases:
+                Case 1: Both Parameters = True (Full Upsert)
+                Existing row found → Update it
+                New row found → Insert it
+
+                Case 2: when_matched_update_all = False, 
when_not_matched_insert_all = True
+                Existing row found → Do nothing (no updates)
+                New row found → Insert it
+
+                Case 3: when_matched_update_all = True, 
when_not_matched_insert_all = False
+                Existing row found → Update it
+                New row found → Do nothing (no inserts)
+
+                Case 4: Both Parameters = False (No Merge Effect)
+                Existing row found → Do nothing
+                New row found → Do nothing
+                (Function effectively does nothing)
+
+
+        Returns: a UpsertResult class (contains details of rows updated and 
inserted)
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            raise Exception('no upsert options selected...exiting')
+
+        if upsert_util.has_duplicate_rows(df, join_cols):
+
+            raise Exception('Duplicate rows found in source dataset based on 
the key columns. No upsert executed')

Review Comment:
   ```suggestion
               raise ValueError('Duplicate rows found in source dataset based 
on the key columns. No upsert executed')
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1066,97 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            self: the target Iceberg table to execute the upsert on
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on. These are essentially analogous 
to primary keys
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+            Example Use Cases:
+                Case 1: Both Parameters = True (Full Upsert)
+                Existing row found → Update it
+                New row found → Insert it
+
+                Case 2: when_matched_update_all = False, 
when_not_matched_insert_all = True
+                Existing row found → Do nothing (no updates)
+                New row found → Insert it
+
+                Case 3: when_matched_update_all = True, 
when_not_matched_insert_all = False
+                Existing row found → Update it
+                New row found → Do nothing (no inserts)
+
+                Case 4: Both Parameters = False (No Merge Effect)
+                Existing row found → Do nothing
+                New row found → Do nothing
+                (Function effectively does nothing)
+
+
+        Returns: a UpsertResult class (contains details of rows updated and 
inserted)
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            raise Exception('no upsert options selected...exiting')
+
+        if upsert_util.has_duplicate_rows(df, join_cols):
+
+            raise Exception('Duplicate rows found in source dataset based on 
the key columns. No upsert executed')
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        matched_predicate = upsert_util.create_match_filter(df, join_cols)
+        matched_iceberg_table = 
self.scan(row_filter=matched_predicate).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0
+
+        try:
+
+            with self.transaction() as txn:
+            
+                if when_matched_update_all:
+                    
+                    #function get_rows_to_update is doing a check on non-key 
columns to see if any of the values have actually changed
+                    rows_to_update = upsert_util.get_rows_to_update(df, 
matched_iceberg_table, join_cols)

Review Comment:
   The tests are not working on my end, but I think we can replace this with 
something as simple as:
   ```suggestion
                       existing_matches_expr = 
upsert_util.create_match_filter(matched_iceberg_table, join_cols)
                       rows_to_update = 
df.filter(expression_to_pyarrow(existing_matches_expr))
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0
+
+        try:
+
+            with self.transaction() as txn:
+            
+                if when_matched_update_all:
+
+                    update_recs = upsert_util.get_rows_to_update(df, 
iceberg_table_trimmed, join_cols)
+
+                    update_row_cnt = len(update_recs)
+
+                    overwrite_filter = 
upsert_util.get_filter_list(update_recs, join_cols)
+
+                    txn.overwrite(update_recs, 
overwrite_filter=overwrite_filter)    
+
+
+                if when_not_matched_insert_all:
+                    
+                    insert_recs = upsert_util.get_rows_to_insert(df, 
iceberg_table_trimmed, join_cols)
+
+                    insert_row_cnt = len(insert_recs)
+
+                    txn.append(insert_recs)
+
+            return {
+                "rows_updated": update_row_cnt,
+                "rows_inserted": insert_row_cnt
+            }
+
+        except Exception as e:

Review Comment:
   I'm also in favor of removing the block 👍 



##########
pyiceberg/table/upsert_util.py:
##########
@@ -0,0 +1,154 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one

Review Comment:
   I'm pretty sure that the `lint` step will complain about this
   ```suggestion
   # Licensed to the Apache Software Foundation (ASF) under one
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to