jqin61 commented on code in PR #931: URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1678416870
########## pyiceberg/table/__init__.py: ########## @@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: append_files.append_data_file(data_file) + def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: + partition_spec = self.table_metadata.spec() + schema = self.table_metadata.schema() + partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields] + + expr: BooleanExpression = AlwaysFalse() + for partition_record in delete_partitions: + match_partition_expression: BooleanExpression = AlwaysTrue() + + for pos in range(len(partition_fields)): + predicate = ( + EqualTo(Reference(partition_fields[pos]), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_fields[pos])) + ) + match_partition_expression = And(match_partition_expression, predicate) + expr = Or(expr, match_partition_expression) + return expr + + def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand for adding a table dynamic overwrite with a PyArrow table to the transaction. + + Args: + df: The Arrow dataframe that will be used to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + _check_schema_compatible(self._table.schema(), other_schema=df.schema) + + # cast if the two schemas are compatible but not equal + table_arrow_schema = self._table.schema().as_arrow() + if table_arrow_schema != df.schema: + df = df.cast(table_arrow_schema) + + # If dataframe does not have data, there is no need to overwrite + if df.shape[0] == 0: + return + + append_snapshot_commit_uuid = uuid.uuid4() + data_files: List[DataFile] = list( + _dataframe_to_data_files( + table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io + ) + ) + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_partitions = [data_file.partition for data_file in data_files] + delete_filter = self._build_partition_predicate( + spec_id=self.table_metadata.spec().spec_id, delete_partitions=delete_partitions + ) + delete_snapshot.delete_by_predicate(delete_filter) + + with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append( + append_snapshot_commit_uuid + ) as append_snapshot: Review Comment: yes i think both transaction.dynamic_overwrite() and transaction.overwrite() could use it just as transaction.append() does? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org