jqin61 commented on code in PR #569: URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1599183250
########## pyiceberg/table/__init__.py: ########## @@ -2897,12 +2987,152 @@ def _commit(self) -> UpdatesAndRequirements: ), ( AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid), - AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id, ref="main"), + AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id, ref="main"), ), ) -class FastAppendFiles(_MergingSnapshotProducer): +class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]): + """Will delete manifest entries from the current snapshot based on the predicate. + + This will produce a DELETE snapshot: + Data files were removed and their contents logically deleted and/or delete + files were added to delete rows. + + From the specification + """ + + _predicate: BooleanExpression + + def __init__( + self, + operation: Operation, + transaction: Transaction, + io: FileIO, + commit_uuid: Optional[uuid.UUID] = None, + snapshot_properties: Dict[str, str] = EMPTY_DICT, + ): + super().__init__(operation, transaction, io, commit_uuid, snapshot_properties) + self._predicate = AlwaysFalse() + + def _commit(self) -> UpdatesAndRequirements: + # Only produce a commit when there is something to delete + if self.files_affected: + return super()._commit() + else: + return (), () + + def _build_partition_projection(self, spec_id: int) -> BooleanExpression: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + project = inclusive_projection(schema, spec) + return project(self._predicate) + + @cached_property + def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: + return KeyDefaultDict(self._build_partition_projection) + + def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: + schema = self._transaction.table_metadata.schema() + spec = self._transaction.table_metadata.specs()[spec_id] + return manifest_evaluator(spec, schema, self.partition_filters[spec_id], case_sensitive=True) + + def delete_by_predicate(self, predicate: BooleanExpression) -> None: + self._predicate = Or(self._predicate, predicate) + + @cached_property + def _compute_deletes(self) -> Tuple[List[ManifestFile], List[ManifestEntry], bool]: + """Computes all the delete operation and cache it when nothing changes. + + Returns: + - List of existing manifests that are not affected by the delete operation. + - The manifest-entries that are deleted based on the metadata. + - Flag indicating that rewrites of data-files are needed. + """ + schema = self._transaction.table_metadata.schema() + + def _copy_with_new_status(entry: ManifestEntry, status: ManifestEntryStatus) -> ManifestEntry: + return ManifestEntry( + status=status, + snapshot_id=entry.snapshot_id, + data_sequence_number=entry.data_sequence_number, + file_sequence_number=entry.file_sequence_number, + data_file=entry.data_file, + ) + + manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = KeyDefaultDict(self._build_manifest_evaluator) + strict_metrics_evaluator = _StrictMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, self._predicate, case_sensitive=True).eval + + existing_manifests = [] + total_deleted_entries = [] + partial_rewrites_needed = False + self._deleted_data_files = set() + if snapshot := self._transaction.table_metadata.current_snapshot(): + for manifest_file in snapshot.manifests(io=self._io): + if manifest_file.content == ManifestContent.DATA: + if not manifest_evaluators[manifest_file.partition_spec_id](manifest_file): + # If the manifest isn't relevant, we can just keep it in the manifest-list + existing_manifests.append(manifest_file) + else: + # It is relevant, let's check out the content + deleted_entries = [] + existing_entries = [] + for entry in manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True): + if strict_metrics_evaluator(entry.data_file) == ROWS_MUST_MATCH: + deleted_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.DELETED)) + self._deleted_data_files.add(entry.data_file) + elif inclusive_metrics_evaluator(entry.data_file) == ROWS_CANNOT_MATCH: + existing_entries.append(_copy_with_new_status(entry, ManifestEntryStatus.EXISTING)) + else: + # Based on the metadata, it is unsure to say if the file can be deleted + partial_rewrites_needed = True + + if len(deleted_entries) > 0: + total_deleted_entries += deleted_entries + + # Rewrite the manifest + if len(existing_entries) > 0: + output_file_location = _new_manifest_path( + location=self._transaction.table_metadata.location, + num=next(self._manifest_counter), + commit_uuid=self.commit_uuid, + ) + with write_manifest( + format_version=self._transaction.table_metadata.format_version, + spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id], + schema=self._transaction.table_metadata.schema(), + output_file=self._io.new_output(output_file_location), + snapshot_id=self._snapshot_id, + ) as writer: + for existing_entry in existing_entries: + writer.add_entry(existing_entry) + existing_manifests.append(writer.to_manifest_file()) Review Comment: The current behavior writes one existing manifest for data file entries which are not deleted by DeleteFiles for each old manifest in parent snapshot, I am thinking maybe it could be a future enhancement to collect the existing_entries across different old manifests and then write existing_manifest. Imagine such a scenario where there are multiple small fast appends before we call DeleteFiles, and each fast_append appends data that is partially deleted by the final DeleteFiles, then this DeleteFiles snapshot will have almost the same amount of 'existing' manifests rather than merging them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org