syun64 commented on code in PR #931: URL: https://github.com/apache/iceberg-python/pull/931#discussion_r1678319796
########## pyiceberg/table/__init__.py: ########## @@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: append_files.append_data_file(data_file) + def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: + partition_spec = self.table_metadata.spec() + schema = self.table_metadata.schema() + partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields] + + expr: BooleanExpression = AlwaysFalse() + for partition_record in delete_partitions: + match_partition_expression: BooleanExpression = AlwaysTrue() + + for pos in range(len(partition_fields)): + predicate = ( + EqualTo(Reference(partition_fields[pos]), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_fields[pos])) + ) + match_partition_expression = And(match_partition_expression, predicate) + expr = Or(expr, match_partition_expression) + return expr + + def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand for adding a table dynamic overwrite with a PyArrow table to the transaction. + + Args: + df: The Arrow dataframe that will be used to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + _check_schema_compatible(self._table.schema(), other_schema=df.schema) + + # cast if the two schemas are compatible but not equal + table_arrow_schema = self._table.schema().as_arrow() + if table_arrow_schema != df.schema: + df = df.cast(table_arrow_schema) + + # If dataframe does not have data, there is no need to overwrite + if df.shape[0] == 0: + return + + append_snapshot_commit_uuid = uuid.uuid4() + data_files: List[DataFile] = list( + _dataframe_to_data_files( + table_metadata=self._table.metadata, write_uuid=append_snapshot_commit_uuid, df=df, io=self._table.io + ) + ) + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_partitions = [data_file.partition for data_file in data_files] + delete_filter = self._build_partition_predicate( + spec_id=self.table_metadata.spec().spec_id, delete_partitions=delete_partitions + ) + delete_snapshot.delete_by_predicate(delete_filter) + + with self.update_snapshot(snapshot_properties=snapshot_properties).fast_append( + append_snapshot_commit_uuid + ) as append_snapshot: Review Comment: What are your thoughts on using `merge_append` when `MANIFEST_MERGE_ENABLED` property is set to true? In the Java implementation append, overwrite and replacePartitions (dynamic overwrite) all use the MergingSnapshotProducer which respects this property. ```suggestion manifest_merge_enabled = PropertyUtil.property_as_bool( self.table_metadata.properties, TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT, ) update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties) append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append with append_method() as append_snapshot: ``` ########## pyiceberg/table/__init__.py: ########## @@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: append_files.append_data_file(data_file) + def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: + partition_spec = self.table_metadata.spec() + schema = self.table_metadata.schema() + partition_fields = [schema.find_field(field.source_id).name for field in partition_spec.fields] + + expr: BooleanExpression = AlwaysFalse() + for partition_record in delete_partitions: + match_partition_expression: BooleanExpression = AlwaysTrue() + + for pos in range(len(partition_fields)): + predicate = ( + EqualTo(Reference(partition_fields[pos]), partition_record[pos]) + if partition_record[pos] is not None + else IsNull(Reference(partition_fields[pos])) + ) + match_partition_expression = And(match_partition_expression, predicate) + expr = Or(expr, match_partition_expression) + return expr + + def dynamic_overwrite(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + """ + Shorthand for adding a table dynamic overwrite with a PyArrow table to the transaction. + + Args: + df: The Arrow dataframe that will be used to overwrite the table + snapshot_properties: Custom properties to be added to the snapshot summary + """ + try: + import pyarrow as pa + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e + + if not isinstance(df, pa.Table): + raise ValueError(f"Expected PyArrow table, got: {df}") + + _check_schema_compatible(self._table.schema(), other_schema=df.schema) + + # cast if the two schemas are compatible but not equal + table_arrow_schema = self._table.schema().as_arrow() + if table_arrow_schema != df.schema: + df = df.cast(table_arrow_schema) Review Comment: Just an FYI: there's a [PR](https://github.com/apache/iceberg-python/pull/921/files) that changes this block of code ########## pyiceberg/table/__init__.py: ########## @@ -502,6 +503,71 @@ def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) for data_file in data_files: append_files.append_data_file(data_file) + def _build_partition_predicate(self, spec_id: int, delete_partitions: List[Record]) -> BooleanExpression: Review Comment: nit: I found the `delete_partitions` argument a bit confusing here, because this function just translates a set of partition record values to its corresponding predicate. Could we rename it to something more generic to indicate that? W should also remove `spec_id` which isn't used in this function ```suggestion def _build_partition_predicate(self, partition_records: List[Record]) -> BooleanExpression: ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org