HonahX commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1584249768


##########
pyiceberg/table/__init__.py:
##########
@@ -443,6 +468,54 @@ def overwrite(
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
+    def delete(self, delete_filter: BooleanExpression, snapshot_properties: 
Dict[str, str] = EMPTY_DICT) -> None:
+        if (
+            self.table_metadata.properties.get(TableProperties.DELETE_MODE, 
TableProperties.DELETE_MODE_COPY_ON_WRITE)
+            == TableProperties.DELETE_MODE_MERGE_ON_READ
+        ):
+            raise NotImplementedError("Merge on read is not yet supported")
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_snapshot.delete_by_predicate(delete_filter)
+
+        # Check if there are any files that require an actual rewrite of a 
data file
+        if delete_snapshot.rewrites_needed is True:
+            # When we want to filter out certain rows, we want to invert the 
expression
+            # delete id = 22 means that we want to look for that value, and 
then remove
+            # if from the Parquet file
+            delete_row_filter = Not(delete_filter)

Review Comment:
   How about `preserve_row_filter` or `rows_to_keep_filter`? I feel it more 
straightforward to say "When we want to filter out certain rows, we want to 
preserve the rest rows that not meet the filter." But this is totally personal 
preference.



##########
pyiceberg/table/__init__.py:
##########
@@ -434,6 +458,9 @@ def overwrite(
         if table_arrow_schema != df.schema:
             df = df.cast(table_arrow_schema)
 
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_snapshot.delete_by_predicate(overwrite_filter)

Review Comment:
   My understanding is that, currently it will
   
   - delete a datafile if all rows satisfy the `overwrite_filter`
   - append the df to the table.
   
   Is this the expected behavior? Looks like we could even change the below 
`overwrite` to `append` because we only `append_data_file`
   
   I feel that it would be reasonable if `overwrite_filter` holds the same 
funcitonality as `delete_filter` below, that we will partially overwrite the 
data file if some of the rows matching the filter.



##########
pyiceberg/table/__init__.py:
##########
@@ -443,6 +468,54 @@ def overwrite(
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
+    def delete(self, delete_filter: BooleanExpression, snapshot_properties: 
Dict[str, str] = EMPTY_DICT) -> None:
+        if (
+            self.table_metadata.properties.get(TableProperties.DELETE_MODE, 
TableProperties.DELETE_MODE_COPY_ON_WRITE)
+            == TableProperties.DELETE_MODE_MERGE_ON_READ
+        ):
+            raise NotImplementedError("Merge on read is not yet supported")
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_snapshot.delete_by_predicate(delete_filter)
+
+        # Check if there are any files that require an actual rewrite of a 
data file
+        if delete_snapshot.rewrites_needed is True:
+            # When we want to filter out certain rows, we want to invert the 
expression
+            # delete id = 22 means that we want to look for that value, and 
then remove
+            # if from the Parquet file
+            delete_row_filter = Not(delete_filter)
+            with 
self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as 
overwrite_snapshot:
+                # Potential optimization is where we check if the files 
actually contain relevant data.
+                files = self._scan(row_filter=delete_filter).plan_files()
+
+                counter = itertools.count(0)
+
+                # This will load the Parquet file into memory, including:
+                #   - Filter out the rows based on the delete filter
+                #   - Projecting it to the current schema
+                #   - Applying the positional deletes if they are there
+                # When writing
+                #   - Apply the latest partition-spec
+                #   - And sort order when added
+                for original_file in files:
+                    df = project_table(
+                        tasks=[original_file],
+                        table_metadata=self._table.metadata,
+                        io=self._table.io,
+                        row_filter=delete_row_filter,
+                        projected_schema=self.table_metadata.schema(),
+                    )
+                    for data_file in _dataframe_to_data_files(
+                        io=self._table.io,
+                        df=df,
+                        table_metadata=self._table.metadata,
+                        write_uuid=overwrite_snapshot.commit_uuid,

Review Comment:
   May be a follow-up: Is it possible to parallelize the overwrite here? e.g. 
Having a `_dataframes_to_data_files` that generating WriteTasks for multiple 
dataframes and call `write_parquet` once.



##########
pyiceberg/table/__init__.py:
##########
@@ -2897,12 +2959,161 @@ def _commit(self) -> UpdatesAndRequirements:
             ),
             (
                 
AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid),
-                AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, 
ref="main"),
+                
AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
 ref="main"),
             ),
         )
 
 
-class FastAppendFiles(_MergingSnapshotProducer):
+class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
+    """Will delete manifest entries from the current snapshot based on the 
predicate.
+
+    This will produce a DELETE snapshot:
+        Data files were removed and their contents logically deleted and/or 
delete
+        files were added to delete rows.
+
+    From the specification
+    """
+
+    _predicate: BooleanExpression
+
+    def __init__(
+        self,
+        operation: Operation,
+        transaction: Transaction,
+        io: FileIO,
+        commit_uuid: Optional[uuid.UUID] = None,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+    ):
+        super().__init__(operation, transaction, io, commit_uuid, 
snapshot_properties)
+        self._predicate = AlwaysFalse()
+
+    def _commit(self) -> UpdatesAndRequirements:
+        # Only produce a commit when there is something to delete
+        if self.files_affected:
+            return super()._commit()
+        else:
+            return (), ()
+
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        project = inclusive_projection(schema, spec)
+        return project(self._predicate)
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        return manifest_evaluator(spec, schema, 
self.partition_filters[spec_id], case_sensitive=True)
+
+    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], 
bool]:

Review Comment:
   Seems this is not used.



##########
pyiceberg/table/__init__.py:
##########
@@ -2897,12 +2959,161 @@ def _commit(self) -> UpdatesAndRequirements:
             ),
             (
                 
AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid),
-                AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, 
ref="main"),
+                
AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
 ref="main"),
             ),
         )
 
 
-class FastAppendFiles(_MergingSnapshotProducer):
+class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
+    """Will delete manifest entries from the current snapshot based on the 
predicate.
+
+    This will produce a DELETE snapshot:
+        Data files were removed and their contents logically deleted and/or 
delete
+        files were added to delete rows.
+
+    From the specification
+    """
+
+    _predicate: BooleanExpression
+
+    def __init__(
+        self,
+        operation: Operation,
+        transaction: Transaction,
+        io: FileIO,
+        commit_uuid: Optional[uuid.UUID] = None,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+    ):
+        super().__init__(operation, transaction, io, commit_uuid, 
snapshot_properties)
+        self._predicate = AlwaysFalse()
+
+    def _commit(self) -> UpdatesAndRequirements:
+        # Only produce a commit when there is something to delete
+        if self.files_affected:
+            return super()._commit()
+        else:
+            return (), ()
+
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        project = inclusive_projection(schema, spec)
+        return project(self._predicate)
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        return manifest_evaluator(spec, schema, 
self.partition_filters[spec_id], case_sensitive=True)
+
+    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], 
bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        partition_type = spec.partition_type(schema)
+        partition_schema = Schema(*partition_type.fields)
+        partition_expr = self.partition_filters[spec_id]
+
+        return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, case_sensitive=True)(data_file.partition)

Review Comment:
   This portion of code is mostly the same as 
[DataScan](https://github.com/apache/iceberg-python/blob/d65a8a42c4c3dc1d12c7bc146d19a7831cb752e8/pyiceberg/table/__init__.py#L1718-L1739).
 Shall we extract these to another class? (e.g. `EvaluatorGenerator`)?



##########
pyiceberg/table/__init__.py:
##########
@@ -292,7 +303,13 @@ def _apply(self, updates: Tuple[TableUpdate, ...], 
requirements: Tuple[TableRequ
             requirement.validate(self.table_metadata)
 
         self._updates += updates
-        self._requirements += requirements
+
+        # For the requirements, it does not make sense to add a requirement 
more than once
+        # For example, you cannot assert that the current schema has two 
different IDs
+        existing_requirements = {type(requirement) for requirement in 
self._requirements}
+        for new_requirement in requirements:
+            if type(new_requirement) not in existing_requirements:
+                self._requirements = self._requirements + requirements

Review Comment:
   I feel that the term "warning" might be too strong for this case. Warning 
typically implies that the user is doing something not recommended or may not 
work as normal. However, as @Fokko mentioned, it is common to have duplicate 
requirements in one transaction. In fact, it is encouraged to wrap multiple 
updates in a single transaction. I think removing duplication is just an 
optimization that should happen silently in the backend.
   
   Alternatively, we could remove duplications once when passing the 
requirements to `self._table._do_commit`.
   
   What do you think?



##########
tests/integration/test_deletes.py:
##########
@@ -0,0 +1,257 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+# pylint:disable=redefined-outer-name
+from typing import List
+
+import pytest
+from pyspark.sql import SparkSession
+
+from pyiceberg.catalog.rest import RestCatalog
+from pyiceberg.expressions import EqualTo
+from pyiceberg.table.snapshots import Operation, Summary
+
+
+def run_spark_commands(spark: SparkSession, sqls: List[str]) -> None:
+    for sql in sqls:
+        spark.sql(sql)
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_partitioned_table_delete_full_file(spark: SparkSession, 
session_catalog: RestCatalog, format_version: int) -> None:
+    identifier = 'default.table_partitioned_delete'
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES('format-version' = {format_version})
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (11, 20), (11, 30)
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+    tbl.delete(EqualTo("number_partitioned", 10))
+
+    # No overwrite operation
+    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] 
== ['append', 'append', 'delete']
+    assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 
11], 'number': [20, 30]}
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_partitioned_table_rewrite(spark: SparkSession, session_catalog: 
RestCatalog, format_version: int) -> None:
+    identifier = 'default.table_partitioned_delete'
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES('format-version' = {format_version})
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (11, 20), (11, 30)
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+    tbl.delete(EqualTo("number", 20))
+
+    # We don't delete a whole partition, so there is only a overwrite
+    assert [snapshot.summary.operation.value for snapshot in tbl.snapshots()] 
== ['append', 'append', 'overwrite']
+    assert tbl.scan().to_arrow().to_pydict() == {'number_partitioned': [11, 
10], 'number': [30, 30]}
+
+
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [1, 2])
+def test_partitioned_table_no_match(spark: SparkSession, session_catalog: 
RestCatalog, format_version: int) -> None:
+    identifier = 'default.table_partitioned_delete'
+
+    run_spark_commands(
+        spark,
+        [
+            f"DROP TABLE IF EXISTS {identifier}",
+            f"""
+            CREATE TABLE {identifier} (
+                number_partitioned  int,
+                number              int
+            )
+            USING iceberg
+            PARTITIONED BY (number_partitioned)
+            TBLPROPERTIES('format-version' = {format_version})
+        """,
+            f"""
+            INSERT INTO {identifier} VALUES (10, 20), (10, 30)
+        """,
+        ],
+    )
+
+    tbl = session_catalog.load_table(identifier)
+    tbl.delete(EqualTo("number_partitioned", 22))  # Does not affect any data
+
+    # Open for discussion, do we want to create a new snapshot?

Review Comment:
   I'm almost +1 on not having a new snapshot if nothing get deleted
   
   The only concern I have is the `snapshot_properties` argument. `append` and 
`overwrite` will create a new snapshot anyway so the `snapshot_properties` is 
always applied. In case of `delete`, this properties will be silently ignored 
if there is no match. Shall we add a warning that the snapshot properties will 
not be updated if no files got deleted



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to