Fokko commented on code in PR #1817:
URL: https://github.com/apache/iceberg-python/pull/1817#discussion_r2086983291


##########
pyiceberg/table/__init__.py:
##########
@@ -695,6 +695,122 @@ def delete(
         if not delete_snapshot.files_affected and not 
delete_snapshot.rewrites_needed:
             warnings.warn("Delete operation did not match any records")
 
+    def upsert(
+        self,
+        df: pa.Table,
+        join_cols: Optional[List[str]] = None,
+        when_matched_update_all: bool = True,
+        when_not_matched_insert_all: bool = True,
+        case_sensitive: bool = True,
+    ) -> UpsertResult:
+        """Shorthand API for performing an upsert to an iceberg table.
+
+        Args:
+
+            df: The input dataframe to upsert with the table's data.
+            join_cols: Columns to join on, if not provided, it will use the 
identifier-field-ids.
+            when_matched_update_all: Bool indicating to update rows that are 
matched but require an update due to a value in a non-key column changing
+            when_not_matched_insert_all: Bool indicating new rows to be 
inserted that do not match any existing rows in the table
+            case_sensitive: Bool indicating if the match should be 
case-sensitive
+
+            To learn more about the identifier-field-ids: 
https://iceberg.apache.org/spec/#identifier-field-ids
+
+                Example Use Cases:
+                    Case 1: Both Parameters = True (Full Upsert)
+                    Existing row found → Update it
+                    New row found → Insert it
+
+                    Case 2: when_matched_update_all = False, 
when_not_matched_insert_all = True
+                    Existing row found → Do nothing (no updates)
+                    New row found → Insert it
+
+                    Case 3: when_matched_update_all = True, 
when_not_matched_insert_all = False
+                    Existing row found → Update it
+                    New row found → Do nothing (no inserts)
+
+                    Case 4: Both Parameters = False (No Merge Effect)
+                    Existing row found → Do nothing
+                    New row found → Do nothing
+                    (Function effectively does nothing)
+
+
+        Returns:
+            An UpsertResult class (contains details of rows updated and 
inserted)
+        """
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For writes PyArrow needs to be 
installed") from e
+
+        from pyiceberg.io.pyarrow import expression_to_pyarrow
+        from pyiceberg.table import upsert_util
+
+        if join_cols is None:
+            join_cols = []
+            for field_id in self.table_metadata.schema().identifier_field_ids:
+                col = self.table_metadata.schema().find_column_name(field_id)
+                if col is not None:
+                    join_cols.append(col)
+                else:
+                    raise ValueError(f"Field-ID could not be found: 
{join_cols}")
+
+        if len(join_cols) == 0:
+            raise ValueError("Join columns could not be found, please set 
identifier-field-ids or pass in explicitly.")
+
+        if not when_matched_update_all and not when_not_matched_insert_all:
+            raise ValueError("no upsert options selected...exiting")
+
+        if upsert_util.has_duplicate_rows(df, join_cols):
+            raise ValueError("Duplicate rows found in source dataset based on 
the key columns. No upsert executed")
+
+        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible
+
+        downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
+        _check_pyarrow_schema_compatible(
+            self.table_metadata.schema(), provided_schema=df.schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+        )
+
+        # get list of rows that exist so we don't have to load the entire 
target table
+        matched_predicate = upsert_util.create_match_filter(df, join_cols)
+
+        # We must use Transaction.table_metadata for the scan. This includes 
all uncommitted - but relevant - changes.
+        matched_iceberg_table = DataScan(

Review Comment:
   Nice, thanks for pointing out, and also covering this in the tests 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to