jayceslesar commented on code in PR #1938: URL: https://github.com/apache/iceberg-python/pull/1938#discussion_r2056909183
########## pyiceberg/table/update/validate.py: ########## @@ -0,0 +1,150 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from typing import Iterator, Optional + +from pyiceberg.expressions import BooleanExpression +from pyiceberg.expressions.visitors import _StrictMetricsEvaluator +from pyiceberg.manifest import ManifestContent, ManifestEntry, ManifestEntryStatus, ManifestFile +from pyiceberg.table import Table +from pyiceberg.table.snapshots import Operation, Snapshot, ancestors_between +from pyiceberg.typedef import Record + +VALIDATE_DATA_FILES_EXIST_OPERATIONS = {Operation.OVERWRITE, Operation.REPLACE, Operation.DELETE} + + +class ValidationException(Exception): + """Raised when validation fails.""" + + +def validation_history( + table: Table, + from_snapshot: Snapshot, + to_snapshot: Optional[Snapshot], + matching_operations: set[Operation], + manifest_content_filter: ManifestContent, +) -> tuple[list[ManifestFile], set[Snapshot]]: + """Return newly added manifests and snapshot IDs between the starting snapshot and parent snapshot. + + Args: + table: Table to get the history from + from_snapshot: Parent snapshot to get the history from + to_snapshot: Starting snapshot + matching_operations: Operations to match on + manifest_content_filter: Manifest content type to filter + + Raises: + ValidationException: If no matching snapshot is found or only one snapshot is found + + Returns: + List of manifest files and set of snapshots matching conditions + """ + manifests_files: list[ManifestFile] = [] + snapshots: set[Snapshot] = set() + + last_snapshot = None + for snapshot in ancestors_between(from_snapshot, to_snapshot, table.metadata): + last_snapshot = snapshot + summary = snapshot.summary + if summary is None or summary.operation not in matching_operations: + continue + + snapshots.add(snapshot) + manifests_files.extend( + [ + manifest + for manifest in snapshot.manifests(table.io) + if manifest.added_snapshot_id == snapshot.snapshot_id and manifest.content == manifest_content_filter + ] + ) + + if last_snapshot is None or last_snapshot.snapshot_id == from_snapshot.snapshot_id: + raise ValidationException("No matching snapshot found.") + + return manifests_files, snapshots + + +def deleted_data_files( + table: Table, + starting_snapshot: Snapshot, + data_filter: Optional[BooleanExpression], + parent_snapshot: Optional[Snapshot], + partition_set: Optional[set[Record]], +) -> Iterator[ManifestEntry]: + """Find deleted data files matching a filter since a starting snapshot. + + Args: + table: Table to validate + starting_snapshot: Snapshot current at the start of the operation + data_filter: Expression used to find deleted data files + partition_set: a set of partitions to find deleted data files + parent_snapshot: Ending snapshot on the branch being validated + + Returns: + List of deleted data files matching the filter + """ + # if there is no current table state, no files have been deleted + if parent_snapshot is None: + return + + manifests, new_snapshots = validation_history( + table, + starting_snapshot, + parent_snapshot, + VALIDATE_DATA_FILES_EXIST_OPERATIONS, + ManifestContent.DATA, + ) + + if data_filter is not None: + evaluator = _StrictMetricsEvaluator(table.schema(), data_filter).eval + + new_snapshot_ids = {s.snapshot_id for s in new_snapshots} + + for manifest in manifests: + for entry in manifest.fetch_manifest_entry(table.io, discard_deleted=False): + if entry.snapshot_id not in new_snapshot_ids: + continue + + if entry.status != ManifestEntryStatus.DELETED: + continue + + if data_filter is not None and not evaluator(entry.data_file): + continue + + if partition_set is not None and (entry.data_file.spec_id, entry.data_file.partition) not in partition_set: + continue Review Comment: probably cleaner to put these each on a line and wrap in an `any` call -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org