koenvo commented on code in PR #1817:
URL: https://github.com/apache/iceberg-python/pull/1817#discussion_r2086298291
##########
pyiceberg/table/__init__.py:
##########
@@ -695,6 +695,122 @@ def delete(
if not delete_snapshot.files_affected and not
delete_snapshot.rewrites_needed:
warnings.warn("Delete operation did not match any records")
+ def upsert(
+ self,
+ df: pa.Table,
+ join_cols: Optional[List[str]] = None,
+ when_matched_update_all: bool = True,
+ when_not_matched_insert_all: bool = True,
+ case_sensitive: bool = True,
+ ) -> UpsertResult:
+ """Shorthand API for performing an upsert to an iceberg table.
+
+ Args:
+
+ df: The input dataframe to upsert with the table's data.
+ join_cols: Columns to join on, if not provided, it will use the
identifier-field-ids.
+ when_matched_update_all: Bool indicating to update rows that are
matched but require an update due to a value in a non-key column changing
+ when_not_matched_insert_all: Bool indicating new rows to be
inserted that do not match any existing rows in the table
+ case_sensitive: Bool indicating if the match should be
case-sensitive
+
+ To learn more about the identifier-field-ids:
https://iceberg.apache.org/spec/#identifier-field-ids
+
+ Example Use Cases:
+ Case 1: Both Parameters = True (Full Upsert)
+ Existing row found → Update it
+ New row found → Insert it
+
+ Case 2: when_matched_update_all = False,
when_not_matched_insert_all = True
+ Existing row found → Do nothing (no updates)
+ New row found → Insert it
+
+ Case 3: when_matched_update_all = True,
when_not_matched_insert_all = False
+ Existing row found → Update it
+ New row found → Do nothing (no inserts)
+
+ Case 4: Both Parameters = False (No Merge Effect)
+ Existing row found → Do nothing
+ New row found → Do nothing
+ (Function effectively does nothing)
+
+
+ Returns:
+ An UpsertResult class (contains details of rows updated and
inserted)
+ """
+ try:
+ import pyarrow as pa # noqa: F401
+ except ModuleNotFoundError as e:
+ raise ModuleNotFoundError("For writes PyArrow needs to be
installed") from e
+
+ from pyiceberg.io.pyarrow import expression_to_pyarrow
+ from pyiceberg.table import upsert_util
+
+ if join_cols is None:
+ join_cols = []
+ for field_id in self.table_metadata.schema().identifier_field_ids:
+ col = self.table_metadata.schema().find_column_name(field_id)
+ if col is not None:
+ join_cols.append(col)
+ else:
+ raise ValueError(f"Field-ID could not be found:
{join_cols}")
+
+ if len(join_cols) == 0:
+ raise ValueError("Join columns could not be found, please set
identifier-field-ids or pass in explicitly.")
+
+ if not when_matched_update_all and not when_not_matched_insert_all:
+ raise ValueError("no upsert options selected...exiting")
+
+ if upsert_util.has_duplicate_rows(df, join_cols):
+ raise ValueError("Duplicate rows found in source dataset based on
the key columns. No upsert executed")
+
+ from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible
+
+ downcast_ns_timestamp_to_us =
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
+ _check_pyarrow_schema_compatible(
+ self.table_metadata.schema(), provided_schema=df.schema,
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+ )
+
+ # get list of rows that exist so we don't have to load the entire
target table
+ matched_predicate = upsert_util.create_match_filter(df, join_cols)
+
+ # We must use Transaction.table_metadata for the scan. This includes
all uncommitted - but relevant - changes.
+ matched_iceberg_table = DataScan(
Review Comment:
Most important change. Required
https://github.com/apache/iceberg-python/pull/1903
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]