mattmartin14 commented on code in PR #1534:
URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1945153041


##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1067,78 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    @dataclass(frozen=True)
+    class UpsertResult:
+        """Summary the upsert operation"""
+        rows_updated: int = 0
+        rows_inserted: int = 0
+        info_msgs: Optional[str] = None
+        error_msgs: Optional[str] = None
+
+    def upsert(self, df: pa.Table, join_cols: list
+                   , when_matched_update_all: bool = True
+                   , when_not_matched_insert_all: bool = True
+                ) -> UpsertResult:
+        """
+        Shorthand API for performing an upsert to an iceberg table.
+        
+        Args:
+            df: The input dataframe to upsert with the table's data.
+            join_cols: The columns to join on.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+
+        Returns: a UpsertResult class
+        """
+
+        from pyiceberg.table import upsert_util
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'info_msgs': 'no 
upsert options selected...exiting'}
+            #return UpsertResult(info_msgs='no upsert options 
selected...exiting')
+
+        if upsert_util.dups_check_in_source(df, join_cols):
+
+            return {'error_msgs': 'Duplicate rows found in source dataset 
based on the key columns. No upsert executed'}
+
+        #get list of rows that exist so we don't have to load the entire 
target table
+        pred = upsert_util.get_filter_list(df, join_cols)
+        iceberg_table_trimmed = self.scan(row_filter=pred).to_arrow()
+
+        update_row_cnt = 0
+        insert_row_cnt = 0
+
+        try:
+
+            with self.transaction() as txn:
+            
+                if when_matched_update_all:
+
+                    update_recs = upsert_util.get_rows_to_update(df, 
iceberg_table_trimmed, join_cols)
+
+                    update_row_cnt = len(update_recs)
+
+                    overwrite_filter = 
upsert_util.get_filter_list(update_recs, join_cols)
+
+                    txn.overwrite(update_recs, 
overwrite_filter=overwrite_filter)    
+
+
+                if when_not_matched_insert_all:
+                    
+                    insert_recs = upsert_util.get_rows_to_insert(df, 
iceberg_table_trimmed, join_cols)
+
+                    insert_row_cnt = len(insert_recs)
+
+                    txn.append(insert_recs)
+
+            return {
+                "rows_updated": update_row_cnt,
+                "rows_inserted": insert_row_cnt
+            }

Review Comment:
   i'm getting this error:
   
   ```bash
   NameError: name 'UpsertResult' is not defined
   ```
   
   I've left the code commented out on line 1157 so you can take a look, on 
this latest push/commit
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to