sungwy commented on code in PR #931: URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1761801698
########## pyiceberg/table/__init__.py: ########## @@ -456,6 +461,85 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: append_files.append_data_file(data_file) + def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression: + partition_spec = self.table_metadata.spec() + schema = self.table_metadata.schema() + partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields] + + expr: BooleanExpression = AlwaysFalse() + for partition_record in partition_records: + match_partition_expression: BooleanExpression = AlwaysTrue() + + for pos in range(len(partition_fields)): + predicate = ( + EqualTo(Reference(partition_fields[pos]), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_fields[pos])) + ) + match_partition_expression = And(match_partition_expression, predicate) + expr = Or(expr, match_partition_expression) + return expr + + def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand for adding a table dynamic overwrite with a PyArrow table to the transaction. + + Args: + df: The Arrow dataframe that will be used to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + if self.table_metadata.spec().is_unpartitioned(): + raise ValueError("Cannot apply dynamic overwrite on an unpartitioned table.") + + for field in self.table_metadata.spec().fields: + if not isinstance(field.transform, IdentityTransform): + raise ValueError( + f"For now dynamic overwrite does not support a table with non-identity-transform field in the latest partition spec: {field}" + ) + + downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False + _check_pyarrow_schema_compatible( + self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us + ) + + # If dataframe does not have data, there is no need to overwrite + if df.shape[0] == 0: + return + + append_snapshot_commit_uuid = uuid.uuid4() + data_files: List[DataFile] = list( + _dataframe_to_data_files( + table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io + ) + ) + + overlapping_partitions = [data_file.partition for data_file in data_files] Review Comment: nit: Should we turn this into a set of partitions instead, and run `_build_partition_predicate` on a unique set of partition records? also thought for rename: `partitions_to_overwrite`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org