syun64 commented on code in PR #931:
URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1678319796


##########
pyiceberg/table/__init__.py:
##########
@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: 
Dict[str, str] = EMPTY_DICT)
                 for data_file in data_files:
                     append_files.append_data_file(data_file)
 
+    def _build_partition_predicate(self, spec_id: int, delete_partitions: 
List[Record]) -> BooleanExpression:
+        partition_spec = self.table_metadata.spec()
+        schema = self.table_metadata.schema()
+        partition_fields = [schema.find_field(field.source_id).name for field 
in partition_spec.fields]
+
+        expr: BooleanExpression = AlwaysFalse()
+        for partition_record in delete_partitions:
+            match_partition_expression: BooleanExpression = AlwaysTrue()
+
+            for pos in range(len(partition_fields)):
+                predicate = (
+                    EqualTo(Reference(partition_fields[pos]), 
partition_record[pos])
+                    if partition_record[pos] is not None
+                    else IsNull(Reference(partition_fields[pos]))
+                )
+                match_partition_expression = And(match_partition_expression, 
predicate)
+            expr = Or(expr, match_partition_expression)
+        return expr
+
+    def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, 
str] = EMPTY_DICT) -> None:
+        """
+        Shorthand for adding a table dynamic overwrite with a PyArrow table to 
the transaction.
+
+        Args:
+            df: The Arrow dataframe that will be used to overwrite the table
+            snapshot_properties: Custom properties to be added to the snapshot 
summary
+        """
+        try:
+            import pyarrow as pa
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For writes PyArrow needs to be 
installed") from e
+
+        if not isinstance(df, pa.Table):
+            raise ValueError(f"Expected PyArrow table, got: {df}")
+
+        _check_schema_compatible(self._table.schema(), other_schema=df.schema)
+
+        # cast if the two schemas are compatible but not equal
+        table_arrow_schema = self._table.schema().as_arrow()
+        if table_arrow_schema != df.schema:
+            df = df.cast(table_arrow_schema)
+
+        # If dataframe does not have data, there is no need to overwrite
+        if df.shape[0] == 0:
+            return
+
+        append_snapshot_commit_uuid = uuid.uuid4()
+        data_files: List[DataFile] = list(
+            _dataframe_to_data_files(
+                table_metadata=self._table.metadata, 
write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
+            )
+        )
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_partitions = [data_file.partition for data_file in 
data_files]
+            delete_filter = self._build_partition_predicate(
+                spec_id=self.table_metadata.spec().spec_id, 
delete_partitions=delete_partitions
+            )
+            delete_snapshot.delete_by_predicate(delete_filter)
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).fast_append(
+            append_snapshot_commit_uuid
+        ) as append_snapshot:

Review Comment:
   What are your thoughts on using `merge_append` when `MANIFEST_MERGE_ENABLED` 
property is set to true? In the Java implementation append, overwrite and 
replacePartitions (dynamic overwrite) all use the MergingSnapshotProducer which 
respects this property.
   
   ```suggestion
           manifest_merge_enabled = PropertyUtil.property_as_bool(
               self.table_metadata.properties,
               TableProperties.MANIFEST_MERGE_ENABLED,
               TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
           )
           update_snapshot = 
self.update_snapshot(snapshot_properties=snapshot_properties)
           append_method = update_snapshot.merge_append if 
manifest_merge_enabled else update_snapshot.fast_append
   
           with append_method() as append_snapshot:
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: 
Dict[str, str] = EMPTY_DICT)
                 for data_file in data_files:
                     append_files.append_data_file(data_file)
 
+    def _build_partition_predicate(self, spec_id: int, delete_partitions: 
List[Record]) -> BooleanExpression:
+        partition_spec = self.table_metadata.spec()
+        schema = self.table_metadata.schema()
+        partition_fields = [schema.find_field(field.source_id).name for field 
in partition_spec.fields]
+
+        expr: BooleanExpression = AlwaysFalse()
+        for partition_record in delete_partitions:
+            match_partition_expression: BooleanExpression = AlwaysTrue()
+
+            for pos in range(len(partition_fields)):
+                predicate = (
+                    EqualTo(Reference(partition_fields[pos]), 
partition_record[pos])
+                    if partition_record[pos] is not None
+                    else IsNull(Reference(partition_fields[pos]))
+                )
+                match_partition_expression = And(match_partition_expression, 
predicate)
+            expr = Or(expr, match_partition_expression)
+        return expr
+
+    def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, 
str] = EMPTY_DICT) -> None:
+        """
+        Shorthand for adding a table dynamic overwrite with a PyArrow table to 
the transaction.
+
+        Args:
+            df: The Arrow dataframe that will be used to overwrite the table
+            snapshot_properties: Custom properties to be added to the snapshot 
summary
+        """
+        try:
+            import pyarrow as pa
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For writes PyArrow needs to be 
installed") from e
+
+        if not isinstance(df, pa.Table):
+            raise ValueError(f"Expected PyArrow table, got: {df}")
+
+        _check_schema_compatible(self._table.schema(), other_schema=df.schema)
+
+        # cast if the two schemas are compatible but not equal
+        table_arrow_schema = self._table.schema().as_arrow()
+        if table_arrow_schema != df.schema:
+            df = df.cast(table_arrow_schema)

Review Comment:
   Just an FYI: there's a 
[PR](https://github.com/apache/iceberg-python/pull/921/files) that changes this 
block of code



##########
pyiceberg/table/__init__.py:
##########
@@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: 
Dict[str, str] = EMPTY_DICT)
                 for data_file in data_files:
                     append_files.append_data_file(data_file)
 
+    def _build_partition_predicate(self, spec_id: int, delete_partitions: 
List[Record]) -> BooleanExpression:

Review Comment:
   nit: I found the `delete_partitions` argument a bit confusing here, because 
this function just translates a set of partition record values to its 
corresponding predicate. Could we rename it to something more generic to 
indicate that? W should also remove `spec_id` which isn't used in this function
   
   ```suggestion
       def _build_partition_predicate(self, partition_records: List[Record]) -> 
BooleanExpression:
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to