kevinjqliu commented on code in PR #1685:
URL: https://github.com/apache/iceberg-python/pull/1685#discussion_r1963933055


##########
pyiceberg/table/upsert_util.py:
##########
@@ -53,42 +53,18 @@ def get_rows_to_update(source_table: pa.Table, 
target_table: pa.Table, join_cols
     """
     all_columns = set(source_table.column_names)
     join_cols_set = set(join_cols)
-
-    non_key_cols = list(all_columns - join_cols_set)
-
-    match_expr = functools.reduce(operator.and_, 
[pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols])
-
-    matching_source_rows = source_table.filter(match_expr)
-
-    rows_to_update = []
-
-    for index in range(matching_source_rows.num_rows):
-        source_row = matching_source_rows.slice(index, 1)
-
-        target_filter = functools.reduce(operator.and_, [pc.field(col) == 
source_row.column(col)[0].as_py() for col in join_cols])
-
-        matching_target_row = target_table.filter(target_filter)
-
-        if matching_target_row.num_rows > 0:
-            needs_update = False
-
-            for non_key_col in non_key_cols:
-                source_value = source_row.column(non_key_col)[0].as_py()
-                target_value = 
matching_target_row.column(non_key_col)[0].as_py()
-
-                if source_value != target_value:
-                    needs_update = True
-                    break
-
-            if needs_update:
-                rows_to_update.append(source_row)
-
-    if rows_to_update:
-        rows_to_update_table = pa.concat_tables(rows_to_update)
-    else:
-        rows_to_update_table = pa.Table.from_arrays([], 
names=source_table.column_names)
-
-    common_columns = 
set(source_table.column_names).intersection(set(target_table.column_names))
-    rows_to_update_table = rows_to_update_table.select(list(common_columns))
-
-    return rows_to_update_table
+    non_key_cols = all_columns - join_cols_set
+
+    diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != 
pc.field(f"{col}-rhs") for col in non_key_cols])
+
+    return (
+        source_table
+        # We already know that the schema is compatible, this is to fix large_ 
types
+        .cast(target_table.schema)
+        .join(target_table, keys=list(join_cols_set), join_type="inner", 
left_suffix="-lhs", right_suffix="-rhs")

Review Comment:
   nit: should we add `coalesce_keys=True` here to avoid duplicates in the 
resulting join table
   
   since we only check [if source_table has 
duplicates](https://github.com/apache/iceberg-python/blob/d1fea5c2bdd4cec248c20c0af51cf6c49966b7dd/pyiceberg/table/__init__.py#L1165),
 the target_table might produce duplicates



##########
pyiceberg/table/upsert_util.py:
##########
@@ -53,42 +53,18 @@ def get_rows_to_update(source_table: pa.Table, 
target_table: pa.Table, join_cols
     """
     all_columns = set(source_table.column_names)
     join_cols_set = set(join_cols)
-
-    non_key_cols = list(all_columns - join_cols_set)
-
-    match_expr = functools.reduce(operator.and_, 
[pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols])
-
-    matching_source_rows = source_table.filter(match_expr)
-
-    rows_to_update = []
-
-    for index in range(matching_source_rows.num_rows):
-        source_row = matching_source_rows.slice(index, 1)
-
-        target_filter = functools.reduce(operator.and_, [pc.field(col) == 
source_row.column(col)[0].as_py() for col in join_cols])
-
-        matching_target_row = target_table.filter(target_filter)
-
-        if matching_target_row.num_rows > 0:
-            needs_update = False
-
-            for non_key_col in non_key_cols:
-                source_value = source_row.column(non_key_col)[0].as_py()
-                target_value = 
matching_target_row.column(non_key_col)[0].as_py()
-
-                if source_value != target_value:
-                    needs_update = True
-                    break
-
-            if needs_update:
-                rows_to_update.append(source_row)
-
-    if rows_to_update:
-        rows_to_update_table = pa.concat_tables(rows_to_update)
-    else:
-        rows_to_update_table = pa.Table.from_arrays([], 
names=source_table.column_names)
-
-    common_columns = 
set(source_table.column_names).intersection(set(target_table.column_names))
-    rows_to_update_table = rows_to_update_table.select(list(common_columns))
-
-    return rows_to_update_table
+    non_key_cols = all_columns - join_cols_set
+
+    diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != 
pc.field(f"{col}-rhs") for col in non_key_cols])

Review Comment:
   de morgans law in the wild 🥇 



##########
pyiceberg/table/upsert_util.py:
##########
@@ -53,42 +53,18 @@ def get_rows_to_update(source_table: pa.Table, 
target_table: pa.Table, join_cols
     """
     all_columns = set(source_table.column_names)
     join_cols_set = set(join_cols)
-
-    non_key_cols = list(all_columns - join_cols_set)
-
-    match_expr = functools.reduce(operator.and_, 
[pc.field(col).isin(target_table.column(col).to_pylist()) for col in join_cols])
-
-    matching_source_rows = source_table.filter(match_expr)
-
-    rows_to_update = []
-
-    for index in range(matching_source_rows.num_rows):
-        source_row = matching_source_rows.slice(index, 1)
-
-        target_filter = functools.reduce(operator.and_, [pc.field(col) == 
source_row.column(col)[0].as_py() for col in join_cols])
-
-        matching_target_row = target_table.filter(target_filter)
-
-        if matching_target_row.num_rows > 0:
-            needs_update = False
-
-            for non_key_col in non_key_cols:
-                source_value = source_row.column(non_key_col)[0].as_py()
-                target_value = 
matching_target_row.column(non_key_col)[0].as_py()
-
-                if source_value != target_value:
-                    needs_update = True
-                    break
-
-            if needs_update:
-                rows_to_update.append(source_row)
-
-    if rows_to_update:
-        rows_to_update_table = pa.concat_tables(rows_to_update)
-    else:
-        rows_to_update_table = pa.Table.from_arrays([], 
names=source_table.column_names)
-
-    common_columns = 
set(source_table.column_names).intersection(set(target_table.column_names))
-    rows_to_update_table = rows_to_update_table.select(list(common_columns))
-
-    return rows_to_update_table
+    non_key_cols = all_columns - join_cols_set
+
+    diff_expr = functools.reduce(operator.or_, [pc.field(f"{col}-lhs") != 
pc.field(f"{col}-rhs") for col in non_key_cols])
+
+    return (
+        source_table
+        # We already know that the schema is compatible, this is to fix large_ 
types
+        .cast(target_table.schema)
+        .join(target_table, keys=list(join_cols_set), join_type="inner", 
left_suffix="-lhs", right_suffix="-rhs")
+        .filter(diff_expr)
+        .drop_columns([f"{col}-rhs" for col in non_key_cols])
+        .rename_columns({f"{col}-lhs" if col not in join_cols else col: col 
for col in source_table.column_names})

Review Comment:
   oh this is a dictionary! 
https://arrow.apache.org/docs/python/generated/pyarrow.Table.html#pyarrow.Table.rename_columns
   and the non-join columns will be ignored by `create_match_filter`



##########
pyiceberg/table/upsert_util.py:
##########
@@ -53,42 +53,18 @@ def get_rows_to_update(source_table: pa.Table, 
target_table: pa.Table, join_cols
     """

Review Comment:
   nit: update the docstring to match the new process.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to