Fokko commented on code in PR #1534: URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1935573488
########## pyiceberg/table/__init__.py: ########## @@ -1064,6 +1064,125 @@ def name_mapping(self) -> Optional[NameMapping]: """Return the table's field-id NameMapping.""" return self.metadata.name_mapping() + def merge_rows(self, df: pa.Table, join_cols: list + ,merge_options: dict = {'when_matched_update_all': True, 'when_not_matched_insert_all': True} + ) -> Dict: + """ + Shorthand API for performing an upsert/merge to an iceberg table. + + Args: + df: The input dataframe to merge with the table's data. + join_cols: The columns to join on. + merge_options: A dictionary of merge actions to perform. Currently supports these predicates > + when_matched_update_all: default is True + when_not_matched_insert_all: default is True + + Returns: + A dictionary containing the number of rows updated and inserted. + """ + + from pyiceberg.table import merge_rows_util + + try: + from datafusion import SessionContext + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For merge_rows, DataFusion needs to be installed") from e + + try: + from pyarrow import dataset as ds + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For merge_rows, PyArrow needs to be installed") from e + + source_table_name = "source" + target_table_name = "target" + + if merge_options is None or merge_options == {}: + merge_options = {'when_matched_update_all': True, 'when_not_matched_insert_all': True} + + when_matched_update_all = merge_options.get('when_matched_update_all', False) + when_not_matched_insert_all = merge_options.get('when_not_matched_insert_all', False) + + if when_matched_update_all == False and when_not_matched_insert_all == False: + return {'rows_updated': 0, 'rows_inserted': 0, 'msg': 'no merge options selected...exiting'} + + ctx = SessionContext() + + #register both source and target tables so we can find the deltas to update/append + ctx.register_dataset(source_table_name, ds.dataset(df)) + ctx.register_dataset(target_table_name, ds.dataset(self.scan().to_arrow())) + + source_col_list = merge_rows_util.get_table_column_list(ctx, source_table_name) + target_col_list = merge_rows_util.get_table_column_list(ctx, target_table_name) + + source_col_names = set([col[0] for col in source_col_list]) + target_col_names = set([col[0] for col in target_col_list]) + + source_col_types = {col[0]: col[1] for col in source_col_list} + + missing_columns = merge_rows_util.do_join_columns_exist(source_col_names, target_col_names, join_cols) + + if missing_columns['source'] or missing_columns['target']: + + return {'error_msgs': f"Join columns missing in tables: Source table columns missing: {missing_columns['source']}, Target table columns missing: {missing_columns['target']}"} + + #check for dups on source + if merge_rows_util.dups_check_in_source(source_table_name, join_cols, ctx): + + return {'error_msgs': 'Duplicate rows found in source dataset based on the key columns. No Merge executed'} + + update_row_cnt = 0 + insert_row_cnt = 0 + + txn = self.transaction() + + try: + + if when_matched_update_all: + + update_recs_sql = merge_rows_util.get_rows_to_update_sql(source_table_name, target_table_name, join_cols, source_col_names, target_col_names) + + update_recs = ctx.sql(update_recs_sql).to_arrow_table() + + update_row_cnt = len(update_recs) + + if len(join_cols) == 1: + join_col = join_cols[0] + col_type = source_col_types[join_col] + values = [row[join_col] for row in update_recs.to_pylist()] + # if strings are in the filter, we encapsulate with tick marks + formatted_values = [f"'{value}'" if col_type == 'string' else str(value) for value in values] + overwrite_filter = f"{join_col} IN ({', '.join(formatted_values)})" Review Comment: Happy to elaborate. The SQL based syntax is supported, mostly to make it easier for humans. If you follow the predicate in the code, first at `txn.overwrite(update_recs, overwrite_filter)`, and then we come to the `.delete()` method: ```python if isinstance(delete_filter, str): delete_filter = _parse_row_filter(delete_filter) ``` What we do here, is we take the string, and turn it into a `BooleanExpression`, as an example:  This converts the string into an `In('col', [1,2,3])` method. Since we don't need to use SQL here, we can optimize this by pushing down `In(..)` directly, and avoid conversion back and forth. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org