koenvo commented on code in PR #1878:
URL: https://github.com/apache/iceberg-python/pull/1878#discussion_r2051737601


##########
pyiceberg/table/upsert_util.py:
##########
@@ -82,14 +82,54 @@ def get_rows_to_update(source_table: pa.Table, 
target_table: pa.Table, join_cols
         ],
     )
 
-    return (
-        source_table
-        # We already know that the schema is compatible, this is to fix large_ 
types
-        .cast(target_table.schema)
-        .join(target_table, keys=list(join_cols_set), join_type="inner", 
left_suffix="-lhs", right_suffix="-rhs")
-        .filter(diff_expr)
-        .drop_columns([f"{col}-rhs" for col in non_key_cols])
-        .rename_columns({f"{col}-lhs" if col not in join_cols else col: col 
for col in source_table.column_names})
-        # Finally cast to the original schema since it doesn't carry 
nullability:
-        # https://github.com/apache/arrow/issues/45557
-    ).cast(target_table.schema)
+    try:
+        return (
+            source_table
+            # We already know that the schema is compatible, this is to fix 
large_ types
+            .cast(target_table.schema)
+            .join(target_table, keys=list(join_cols_set), join_type="inner", 
left_suffix="-lhs", right_suffix="-rhs")
+            .filter(diff_expr)
+            .drop_columns([f"{col}-rhs" for col in non_key_cols])
+            .rename_columns({f"{col}-lhs" if col not in join_cols else col: 
col for col in source_table.column_names})
+            # Finally cast to the original schema since it doesn't carry 
nullability:
+            # https://github.com/apache/arrow/issues/45557
+        ).cast(target_table.schema)
+    except pa.ArrowInvalid:
+        # When we are not able to compare (e.g. due to unsupported types),
+        # fall back to selecting only rows in the source table that do NOT 
already exist in the target.
+        # See: https://github.com/apache/arrow/issues/35785
+        MARKER_COLUMN_NAME = "__from_target"
+        INDEX_COLUMN_NAME = "__source_index"
+
+        if MARKER_COLUMN_NAME in join_cols_set or INDEX_COLUMN_NAME in 
join_cols_set:
+            raise ValueError(
+                f"{MARKER_COLUMN_NAME} and {INDEX_COLUMN_NAME} are reserved 
for joining "
+                f"DataFrames, and cannot be used as column names"
+            ) from None
+

Review Comment:
   There doesn't seem to be a simple way of checking of a type is supported in 
a join. The `IsSupported` function ( 
https://github.com/apache/arrow/blob/c54b039c77dd0bfa822bc0a54c7f4ca1189e0d57/cpp/src/arrow/acero/hash_join_node.cc#L49-L58
 ) is not available from PyArrow.
   
   Could do another try/except around the join but doesn't feel right.



##########
pyiceberg/table/upsert_util.py:
##########
@@ -82,14 +82,54 @@ def get_rows_to_update(source_table: pa.Table, 
target_table: pa.Table, join_cols
         ],
     )
 
-    return (
-        source_table
-        # We already know that the schema is compatible, this is to fix large_ 
types
-        .cast(target_table.schema)
-        .join(target_table, keys=list(join_cols_set), join_type="inner", 
left_suffix="-lhs", right_suffix="-rhs")
-        .filter(diff_expr)
-        .drop_columns([f"{col}-rhs" for col in non_key_cols])
-        .rename_columns({f"{col}-lhs" if col not in join_cols else col: col 
for col in source_table.column_names})
-        # Finally cast to the original schema since it doesn't carry 
nullability:
-        # https://github.com/apache/arrow/issues/45557
-    ).cast(target_table.schema)
+    try:
+        return (
+            source_table
+            # We already know that the schema is compatible, this is to fix 
large_ types
+            .cast(target_table.schema)
+            .join(target_table, keys=list(join_cols_set), join_type="inner", 
left_suffix="-lhs", right_suffix="-rhs")
+            .filter(diff_expr)
+            .drop_columns([f"{col}-rhs" for col in non_key_cols])
+            .rename_columns({f"{col}-lhs" if col not in join_cols else col: 
col for col in source_table.column_names})
+            # Finally cast to the original schema since it doesn't carry 
nullability:
+            # https://github.com/apache/arrow/issues/45557
+        ).cast(target_table.schema)
+    except pa.ArrowInvalid:
+        # When we are not able to compare (e.g. due to unsupported types),
+        # fall back to selecting only rows in the source table that do NOT 
already exist in the target.
+        # See: https://github.com/apache/arrow/issues/35785
+        MARKER_COLUMN_NAME = "__from_target"
+        INDEX_COLUMN_NAME = "__source_index"
+
+        if MARKER_COLUMN_NAME in join_cols_set or INDEX_COLUMN_NAME in 
join_cols_set:
+            raise ValueError(
+                f"{MARKER_COLUMN_NAME} and {INDEX_COLUMN_NAME} are reserved 
for joining "
+                f"DataFrames, and cannot be used as column names"
+            ) from None
+
+        # Step 1: Prepare source index with join keys and a marker index
+        # Cast to target table schema, so we can do the join
+        # See: https://github.com/apache/arrow/issues/37542
+        source_index = (
+            source_table.cast(target_table.schema)
+            .select(join_cols_set)
+            .append_column(INDEX_COLUMN_NAME, 
pa.array(range(len(source_table))))
+        )
+
+        # Step 2: Prepare target index with join keys and a marker
+        target_index = 
target_table.select(join_cols_set).append_column(MARKER_COLUMN_NAME, 
pa.repeat(True, len(target_table)))
+
+        # Step 3: Perform a left outer join to find which rows from source 
exist in target
+        joined = source_index.join(target_index, keys=list(join_cols_set), 
join_type="left outer")
+
+        # Step 4: Restore original source order
+        joined = joined.sort_by(INDEX_COLUMN_NAME)
+
+        # Step 5: Create a boolean mask for rows that do exist in the target
+        # i.e., where marker column is true after the join
+        to_update_mask = pc.invert(pc.is_null(joined[MARKER_COLUMN_NAME]))

Review Comment:
   Good one.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to