HonahX commented on code in PR #569: URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1584249768
########## pyiceberg/table/__init__.py: ########## @@ -443,6 +468,54 @@ def overwrite( for data_file in data_files: update_snapshot.append_data_file(data_file) + def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + if ( + self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_COPY_ON_WRITE) + == TableProperties.DELETE_MODE_MERGE_ON_READ + ): + raise NotImplementedError("Merge on read is not yet supported") + + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_snapshot.delete_by_predicate(delete_filter) + + # Check if there are any files that require an actual rewrite of a data file + if delete_snapshot.rewrites_needed is True: + # When we want to filter out certain rows, we want to invert the expression + # delete id = 22 means that we want to look for that value, and then remove + # if from the Parquet file + delete_row_filter = Not(delete_filter) Review Comment: How about `preserve_row_filter` or `rows_to_keep_filter`? I feel it more straightforward to say "When we want to filter out certain rows, we want to preserve the rest rows that not meet the filter." But this is totally personal preference. ########## pyiceberg/table/__init__.py: ########## @@ -434,6 +458,9 @@ def overwrite( if table_arrow_schema != df.schema: df = df.cast(table_arrow_schema) + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_snapshot.delete_by_predicate(overwrite_filter) Review Comment: My understanding is that, currently it will - delete a datafile if all rows satisfy the `overwrite_filter` - append the df to the table. Is this the expected behavior? Looks like we could even change the below `overwrite` to `append` because we only `append_data_file` I feel that it would be reasonable if `overwrite_filter` holds the same funcitonality as `delete_filter` below, that we will partially overwrite the data file if some of the rows matching the filter. ########## pyiceberg/table/__init__.py: ########## @@ -443,6 +468,54 @@ def overwrite( for data_file in data_files: update_snapshot.append_data_file(data_file) + def delete(self, delete_filter: BooleanExpression, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None: + if ( + self.table_metadata.properties.get(TableProperties.DELETE_MODE, TableProperties.DELETE_MODE_COPY_ON_WRITE) + == TableProperties.DELETE_MODE_MERGE_ON_READ + ): + raise NotImplementedError("Merge on read is not yet supported") + + with self.update_snapshot(snapshot_properties=snapshot_properties).delete() as delete_snapshot: + delete_snapshot.delete_by_predicate(delete_filter) + + # Check if there are any files that require an actual rewrite of a data file + if delete_snapshot.rewrites_needed is True: + # When we want to filter out certain rows, we want to invert the expression + # delete id = 22 means that we want to look for that value, and then remove + # if from the Parquet file + delete_row_filter = Not(delete_filter) + with self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as overwrite_snapshot: + # Potential optimization is where we check if the files actually contain relevant data. + files = self._scan(row_filter=delete_filter).plan_files() + + counter = itertools.count(0) + + # This will load the Parquet file into memory, including: + # - Filter out the rows based on the delete filter + # - Projecting it to the current schema + # - Applying the positional deletes if they are there + # When writing + # - Apply the latest partition-spec + # - And sort order when added + for original_file in files: + df = project_table( + tasks=[original_file], + table_metadata=self._table.metadata, + io=self._table.io, + row_filter=delete_row_filter, + projected_schema=self.table_metadata.schema(), + ) + for data_file in _dataframe_to_data_files( + io=self._table.io, + df=df, + table_metadata=self._table.metadata, + write_uuid=overwrite_snapshot.commit_uuid, Review Comment: May be a follow-up: Is it possible to parallelize the overwrite here? e.g. Having a `_dataframes_to_data_files` that generating WriteTasks for multiple dataframes and call `write_parquet` once. ########## pyiceberg/table/__init__.py: ########## @@ -2897,12 +2959,161 @@ def _commit(self) -> UpdatesAndRequirements: ), ( AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid), - AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"), + AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"), ), ) -class FastAppendFiles(_MergingSnapshotProducer): +class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]): + """Will delete manifest entries from the current snapshot based on the predicate. + + This will produce a DELETE snapshot: + Data files were removed and their contents logically deleted and/or delete + files were added to delete rows. + + From the specification + """ + + _predicate: BooleanExpression + + def __init__( + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + self._predicate = AlwaysFalse() + + def _commit(self) -> UpdatesAndRequirements: + # Only produce a commit when there is something to delete + if self.files_affected: + return super()._commit() + else: + return (), () + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + project = inclusive_projection(schema, spec) + return project(self._predicate) + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True) + + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: Review Comment: Seems this is not used. ########## pyiceberg/table/__init__.py: ########## @@ -2897,12 +2959,161 @@ def _commit(self) -> UpdatesAndRequirements: ), ( AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid), - AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"), + AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"), ), ) -class FastAppendFiles(_MergingSnapshotProducer): +class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]): + """Will delete manifest entries from the current snapshot based on the predicate. + + This will produce a DELETE snapshot: + Data files were removed and their contents logically deleted and/or delete + files were added to delete rows. + + From the specification + """ + + _predicate: BooleanExpression + + def __init__( + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + self._predicate = AlwaysFalse() + + def _commit(self) -> UpdatesAndRequirements: + # Only produce a commit when there is something to delete + if self.files_affected: + return super()._commit() + else: + return (), () + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + project = inclusive_projection(schema, spec) + return project(self._predicate) + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True) + + def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + partition_type = spec.partition_type(schema) + partition_schema = Schema(*partition_type.fields) + partition_expr = self.partition_filters[spec_id] + + return lambda data_file: expression_evaluator(partition_schema, partition_expr, case_sensitive=True)(data_file.partition) Review Comment: This portion of code is mostly the same as [DataScan](https://github.com/apache/iceberg-python/blob/d65a8a42c4c3dc1d12c7bc146d19a7831cb752e8/pyiceberg/table/__init__.py#L1718-L1739). Shall we extract these to another class? (e.g. `EvaluatorGenerator`)? ########## pyiceberg/table/__init__.py: ########## @@ -292,7 +303,13 @@ def _apply(self, updates: Tuple[TableUpdate, ...], requirements: Tuple[TableRequ requirement.validate(self.table_metadata) self._updates += updates - self._requirements += requirements + + # For the requirements, it does not make sense to add a requirement more than once + # For example, you cannot assert that the current schema has two different IDs + existing_requirements = {type(requirement) for requirement in self._requirements} + for new_requirement in requirements: + if type(new_requirement) not in existing_requirements: + self._requirements = self._requirements + requirements Review Comment: I feel that the term "warning" might be too strong for this case. Warning typically implies that the user is doing something not recommended or may not work as normal. However, as @Fokko mentioned, it is common to have duplicate requirements in one transaction. In fact, it is encouraged to wrap multiple updates in a single transaction. I think removing duplication is just an optimization that should happen silently in the backend. Alternatively, we could remove duplications once when passing the requirements to `self._table._do_commit`. What do you think? ########## tests/integration/test_deletes.py: ########## @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# pylint:disable=redefined-outer-name +from typing import List + +import pytest +from pyspark.sql import SparkSession + +from pyiceberg.catalog.rest import RestCatalog +from pyiceberg.expressions import EqualTo +from pyiceberg.table.snapshots import Operation, Summary + + +def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None: + for sql in sqls: + spark.sql(sql) + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_partitioned_table_delete_full_file(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = 'default.table_partitioned_delete' + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES('format-version' = {format_version}) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30) + """, + f""" + INSERT INTO {identifier} VALUES (11, 20), (11, 30) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number_partitioned", 10)) + + # No overwrite operation + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'delete'] + assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 11], 'number': [20, 30]} + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = 'default.table_partitioned_delete' + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES('format-version' = {format_version}) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30) + """, + f""" + INSERT INTO {identifier} VALUES (11, 20), (11, 30) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number", 20)) + + # We don't delete a whole partition, so there is only a overwrite + assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] == ['append', 'append', 'overwrite'] + assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 10], 'number': [30, 30]} + + +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [1, 2]) +def test_partitioned_table_no_match(spark: SparkSession, session_catalog: RestCatalog, format_version: int) -> None: + identifier = 'default.table_partitioned_delete' + + run_spark_commands( + spark, + [ + f"DROP TABLE IF EXISTS {identifier}", + f""" + CREATE TABLE {identifier} ( + number_partitioned int, + number int + ) + USING iceberg + PARTITIONED BY (number_partitioned) + TBLPROPERTIES('format-version' = {format_version}) + """, + f""" + INSERT INTO {identifier} VALUES (10, 20), (10, 30) + """, + ], + ) + + tbl = session_catalog.load_table(identifier) + tbl.delete(EqualTo("number_partitioned", 22)) # Does not affect any data + + # Open for discussion, do we want to create a new snapshot? Review Comment: I'm almost +1 on not having a new snapshot if nothing get deleted The only concern I have is the `snapshot_properties` argument. `append` and `overwrite` will create a new snapshot anyway so the `snapshot_properties` is always applied. In case of `delete`, this properties will be silently ignored if there is no match. Shall we add a warning that the snapshot properties will not be updated if no files got deleted -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org