Fokko commented on code in PR #931:
URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1829272105


##########
pyiceberg/table/__init__.py:
##########
@@ -456,6 +461,89 @@ def append(self, df: pa.Table, snapshot_properties: 
Dict[str, str] = EMPTY_DICT)
                 for data_file in data_files:
                     append_files.append_data_file(data_file)
 
+    def _build_partition_predicate(self, partition_records: Set[Record]) -> 
BooleanExpression:
+        partition_spec = self.table_metadata.spec()
+        schema = self.table_metadata.schema()
+        partition_fields = [schema.find_field(field.source_id).name for field 
in partition_spec.fields]
+
+        expr: BooleanExpression = AlwaysFalse()
+        for partition_record in partition_records:
+            match_partition_expression: BooleanExpression = AlwaysTrue()
+
+            for pos, partition_field in enumerate(partition_fields):
+                predicate = (
+                    EqualTo(Reference(partition_field), partition_record[pos])
+                    if partition_record[pos] is not None
+                    else IsNull(Reference(partition_field))
+                )
+                match_partition_expression = And(match_partition_expression, 
predicate)
+            expr = Or(expr, match_partition_expression)
+        return expr
+
+    def dynamic_partition_overwrite(self, df: pa.Table, snapshot_properties: 
Dict[str, str] = EMPTY_DICT) -> None:
+        """
+        Shorthand for overwriting existing partitions with a PyArrow table.
+
+        The function detects partition values in the provided arrow table 
using the current
+        partition spec, and deletes existing partitions matching these values. 
Finally, the
+        data in the table is appended to the table.
+
+        Args:
+            df: The Arrow dataframe that will be used to overwrite the table
+            snapshot_properties: Custom properties to be added to the snapshot 
summary
+        """
+        try:
+            import pyarrow as pa
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For writes PyArrow needs to be 
installed") from e
+
+        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, 
_dataframe_to_data_files
+
+        if not isinstance(df, pa.Table):
+            raise ValueError(f"Expected PyArrow table, got: {df}")
+
+        if self.table_metadata.spec().is_unpartitioned():
+            raise ValueError("Cannot apply dynamic overwrite on an 
unpartitioned table.")
+
+        for field in self.table_metadata.spec().fields:
+            if not isinstance(field.transform, IdentityTransform):
+                raise ValueError(
+                    f"For now dynamic overwrite does not support a table with 
non-identity-transform field in the latest partition spec: {field}"
+                )
+
+        downcast_ns_timestamp_to_us = 
Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
+        _check_pyarrow_schema_compatible(
+            self.table_metadata.schema(), provided_schema=df.schema, 
downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
+        )
+
+        # If dataframe does not have data, there is no need to overwrite
+        if df.shape[0] == 0:
+            return
+
+        append_snapshot_commit_uuid = uuid.uuid4()
+        data_files: List[DataFile] = list(
+            _dataframe_to_data_files(
+                table_metadata=self._table.metadata, 
write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io
+            )
+        )
+
+        partitions_to_overwrite = {data_file.partition for data_file in 
data_files}
+        delete_filter = 
self._build_partition_predicate(partition_records=partitions_to_overwrite)
+        self.delete(delete_filter=delete_filter, 
snapshot_properties=snapshot_properties)
+
+        manifest_merge_enabled = property_as_bool(
+            self.table_metadata.properties,
+            TableProperties.MANIFEST_MERGE_ENABLED,
+            TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
+        )
+        update_snapshot = 
self.update_snapshot(snapshot_properties=snapshot_properties)
+        append_method = update_snapshot.merge_append if manifest_merge_enabled 
else update_snapshot.fast_append

Review Comment:
   This logic is duplicated below as well, maybe move it into a function?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to