Fokko commented on code in PR #1938:
URL: https://github.com/apache/iceberg-python/pull/1938#discussion_r2071299989


##########
pyiceberg/table/update/validate.py:
##########
@@ -69,3 +75,74 @@ def validation_history(
         raise ValidationException("No matching snapshot found.")
 
     return manifests_files, snapshots
+
+
+def deleted_data_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: Optional[BooleanExpression],
+    parent_snapshot: Optional[Snapshot],
+    partition_set: Optional[set[Record]],
+) -> Iterator[ManifestEntry]:
+    """Find deleted data files matching a filter since a starting snapshot.
+
+    Args:
+        table: Table to validate
+        starting_snapshot: Snapshot current at the start of the operation
+        data_filter: Expression used to find deleted data files
+        partition_set: a set of partitions to find deleted data files
+        parent_snapshot: Ending snapshot on the branch being validated
+
+    Returns:
+        List of deleted data files matching the filter
+    """
+    # if there is no current table state, no files have been deleted
+    if parent_snapshot is None:
+        return
+
+    manifests, snapshot_ids = validation_history(
+        table,
+        starting_snapshot,
+        parent_snapshot,
+        VALIDATE_DATA_FILES_EXIST_OPERATIONS,
+        ManifestContent.DATA,
+    )
+
+    if data_filter is not None:
+        evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval
+
+    for manifest in manifests:
+        for entry in manifest.fetch_manifest_entry(table.io, 
discard_deleted=False):
+            if entry.snapshot_id not in snapshot_ids:
+                continue
+
+            if entry.status != ManifestEntryStatus.DELETED:
+                continue
+
+            if data_filter is not None and not evaluator(entry.data_file):
+                continue
+
+            if partition_set is not None and (entry.data_file.spec_id, 
entry.data_file.partition) not in partition_set:
+                continue
+
+            yield entry
+
+
+def validate_deleted_data_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: Optional[BooleanExpression],
+    parent_snapshot: Snapshot,
+) -> None:
+    """Validate that no files matching a filter have been deleted from the 
table since a starting snapshot.
+
+    Args:
+        table: Table to validate
+        starting_snapshot: Snapshot current at the start of the operation
+        data_filter: Expression used to find deleted data files
+        parent_snapshot: Ending snapshot on the branch being validated
+
+    """
+    conflicting_entries = deleted_data_files(table, starting_snapshot, 
data_filter, None, parent_snapshot)
+    if any(conflicting_entries):
+        raise ValidationException("Deleted data files were found matching the 
filter.")

Review Comment:
   Maybe nice to also add to the Exception which snapshot-id(s) are conflicting



##########
pyiceberg/table/update/validate.py:
##########
@@ -69,3 +75,74 @@ def validation_history(
         raise ValidationException("No matching snapshot found.")
 
     return manifests_files, snapshots
+
+
+def deleted_data_files(

Review Comment:
   Can we mark these as private? I don't think we want to expose this directly 
to the user, and marking it private makes it easier to change the signature 
later on (since we don't have to go through the deprecation cycle).
   ```suggestion
   def _deleted_data_files(
   ```



##########
pyiceberg/table/update/validate.py:
##########
@@ -69,3 +75,74 @@ def validation_history(
         raise ValidationException("No matching snapshot found.")
 
     return manifests_files, snapshots
+
+
+def deleted_data_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: Optional[BooleanExpression],
+    parent_snapshot: Optional[Snapshot],
+    partition_set: Optional[set[Record]],
+) -> Iterator[ManifestEntry]:
+    """Find deleted data files matching a filter since a starting snapshot.
+
+    Args:
+        table: Table to validate
+        starting_snapshot: Snapshot current at the start of the operation
+        data_filter: Expression used to find deleted data files
+        partition_set: a set of partitions to find deleted data files
+        parent_snapshot: Ending snapshot on the branch being validated
+
+    Returns:
+        List of deleted data files matching the filter

Review Comment:
   ```suggestion
           List of conflicting manifest-entries
   ```



##########
pyiceberg/table/update/validate.py:
##########
@@ -69,3 +75,74 @@ def validation_history(
         raise ValidationException("No matching snapshot found.")
 
     return manifests_files, snapshots
+
+
+def deleted_data_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: Optional[BooleanExpression],
+    parent_snapshot: Optional[Snapshot],
+    partition_set: Optional[set[Record]],
+) -> Iterator[ManifestEntry]:
+    """Find deleted data files matching a filter since a starting snapshot.
+
+    Args:
+        table: Table to validate
+        starting_snapshot: Snapshot current at the start of the operation
+        data_filter: Expression used to find deleted data files
+        partition_set: a set of partitions to find deleted data files
+        parent_snapshot: Ending snapshot on the branch being validated
+
+    Returns:
+        List of deleted data files matching the filter
+    """
+    # if there is no current table state, no files have been deleted
+    if parent_snapshot is None:
+        return
+
+    manifests, snapshot_ids = validation_history(
+        table,
+        starting_snapshot,
+        parent_snapshot,
+        VALIDATE_DATA_FILES_EXIST_OPERATIONS,
+        ManifestContent.DATA,
+    )
+
+    if data_filter is not None:
+        evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval
+
+    for manifest in manifests:
+        for entry in manifest.fetch_manifest_entry(table.io, 
discard_deleted=False):
+            if entry.snapshot_id not in snapshot_ids:
+                continue
+
+            if entry.status != ManifestEntryStatus.DELETED:
+                continue
+
+            if data_filter is not None and not evaluator(entry.data_file):

Review Comment:
   Maybe better to make it explicit:
   ```suggestion
               if data_filter is not None and not evaluator(entry.data_file) is 
ROWS_CANNOT_MATCH:
   ```



##########
pyiceberg/table/update/validate.py:
##########
@@ -69,3 +75,74 @@ def validation_history(
         raise ValidationException("No matching snapshot found.")
 
     return manifests_files, snapshots
+
+
+def deleted_data_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: Optional[BooleanExpression],
+    parent_snapshot: Optional[Snapshot],
+    partition_set: Optional[set[Record]],
+) -> Iterator[ManifestEntry]:
+    """Find deleted data files matching a filter since a starting snapshot.
+
+    Args:
+        table: Table to validate
+        starting_snapshot: Snapshot current at the start of the operation
+        data_filter: Expression used to find deleted data files
+        partition_set: a set of partitions to find deleted data files
+        parent_snapshot: Ending snapshot on the branch being validated
+
+    Returns:
+        List of deleted data files matching the filter
+    """
+    # if there is no current table state, no files have been deleted
+    if parent_snapshot is None:
+        return
+
+    manifests, snapshot_ids = validation_history(
+        table,
+        starting_snapshot,
+        parent_snapshot,
+        VALIDATE_DATA_FILES_EXIST_OPERATIONS,
+        ManifestContent.DATA,
+    )
+
+    if data_filter is not None:
+        evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval

Review Comment:
   Yes, I agree that this should be inclusive projection, since we want to know 
if there are any matches. Inclusive projection returns `rows_might_match` and 
`rows_cannot_match`. If they cannot be matched, then we can skip it :)



##########
pyiceberg/table/update/validate.py:
##########
@@ -69,3 +75,74 @@ def validation_history(
         raise ValidationException("No matching snapshot found.")
 
     return manifests_files, snapshots
+
+
+def deleted_data_files(
+    table: Table,
+    starting_snapshot: Snapshot,
+    data_filter: Optional[BooleanExpression],
+    parent_snapshot: Optional[Snapshot],
+    partition_set: Optional[set[Record]],

Review Comment:
   I think this needs some more work. Next to @sungwy's comment, we in the code 
below:
   
   ```python
   (entry.data_file.spec_id, entry.data_file.partition) not in partition_set
   ```
   
   Where it checks if a tuple is in a `partition_set`, but the `partition_set` 
only contains the `Record` according to the signature.
   
   This triggered me, because if you do:
   
   ```sql
   ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH 
day(pickup_timestamp);
   
   -- and then
   ALTER TABLE prod.db.taxis REPLACE PARTITION FIELD pickup_timestamp WITH 
day(dropoff_timestamp);
   ```
   
   Both of the partitioning strategies will produce a `Record[int]` because it 
will contain the number of days since epoch. But the meaning is completely 
different.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to