arul-cc commented on issue #2120: URL: https://github.com/apache/iceberg-python/issues/2120#issuecomment-2989751909
> > Suggestions for safe parallel upsert patterns in Iceberg > > maybe a good approach is to start a transaction, and only commit and the very end after all the upsert calls Hi @kevinjqliu when I checked the source code, they're actually doing upsert in a transcation only. Attaching the source code ```python with self.transaction() as tx: if when_matched_update_all: # function get_rows_to_update is doing a check on non-key columns to see if any of the values have actually changed # we don't want to do just a blanket overwrite for matched rows if the actual non-key column data hasn't changed # this extra step avoids unnecessary IO and writes rows_to_update = upsert_util.get_rows_to_update(df, matched_iceberg_table, join_cols) update_row_cnt = len(rows_to_update) if len(rows_to_update) > 0: # build the match predicate filter overwrite_mask_predicate = upsert_util.create_match_filter(rows_to_update, join_cols) tx.overwrite(rows_to_update, overwrite_filter=overwrite_mask_predicate) if when_not_matched_insert_all: expr_match = upsert_util.create_match_filter(matched_iceberg_table, join_cols) expr_match_bound = bind(self.schema(), expr_match, case_sensitive=case_sensitive) expr_match_arrow = expression_to_pyarrow(expr_match_bound) rows_to_insert = df.filter(~expr_match_arrow) insert_row_cnt = len(rows_to_insert) if insert_row_cnt > 0: tx.append(rows_to_insert) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org