Fokko commented on code in PR #1534:
URL: https://github.com/apache/iceberg-python/pull/1534#discussion_r1935566788


##########
pyiceberg/table/__init__.py:
##########
@@ -1064,6 +1064,125 @@ def name_mapping(self) -> Optional[NameMapping]:
         """Return the table's field-id NameMapping."""
         return self.metadata.name_mapping()
 
+    def merge_rows(self, df: pa.Table, join_cols: list
+                    ,merge_options: dict = {'when_matched_update_all': True, 
'when_not_matched_insert_all': True}
+                ) -> Dict:
+        """
+        Shorthand API for performing an upsert/merge to an iceberg table.
+        
+        Args:
+            df: The input dataframe to merge with the table's data.
+            join_cols: The columns to join on.
+            merge_options: A dictionary of merge actions to perform. Currently 
supports these predicates >
+                when_matched_update_all: default is True
+                when_not_matched_insert_all: default is True
+
+        Returns:
+            A dictionary containing the number of rows updated and inserted.
+        """
+
+        from pyiceberg.table import merge_rows_util
+
+        try:
+            from datafusion import SessionContext
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For merge_rows, DataFusion needs to be 
installed") from e
+        
+        try:
+            from pyarrow import dataset as ds
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For merge_rows, PyArrow needs to be 
installed") from e
+
+        source_table_name = "source"
+        target_table_name = "target"
+
+        if merge_options is None or merge_options == {}:
+            merge_options = {'when_matched_update_all': True, 
'when_not_matched_insert_all': True}
+
+        when_matched_update_all = merge_options.get('when_matched_update_all', 
False)
+        when_not_matched_insert_all = 
merge_options.get('when_not_matched_insert_all', False)
+
+        if when_matched_update_all == False and when_not_matched_insert_all == 
False:
+            return {'rows_updated': 0, 'rows_inserted': 0, 'msg': 'no merge 
options selected...exiting'}
+
+        ctx = SessionContext()
+
+        #register both source and target tables so we can find the deltas to 
update/append
+        ctx.register_dataset(source_table_name, ds.dataset(df))
+        ctx.register_dataset(target_table_name, 
ds.dataset(self.scan().to_arrow()))

Review Comment:
   Sorry, I jumped a bit to conclusions in my previous comment. Allow me to 
elaborate.
   
   Even with the `to_arrow()` that we have above, we pull in the whole table. 
Based on the Iceberg statistics, we can jump over data that's not needed at 
all. Instead, I would suggest constructing the predicate, and then pull in the 
data:
   
   ```python
   unique_keys = df.select(join_cols).group_by(join_cols).aggregate([])
   if len(join_cols) == 1:
       pred = In(join_cols[0], unique_keys[0].to_pylist())
   else:
       pred = Or(*[
           And(*[
               EqualTo(col, row[col])
               for col in join_cols
           ])
           for row in unique_keys.to_pylist()
       ])
   ctx.register_dataset(target_table_name, 
ds.dataset(self.scan(row_filter=pred).to_arrow()))
   ```
   
   This way we only load in Parquet files that are relevant to the set of keys 
that we're looking for.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to