Fokko commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1615942072
##########
pyiceberg/table/__init__.py:
##########
@@ -2897,12 +2987,152 @@ def _commit(self) -> UpdatesAndRequirements:
),
(
AssertTableUUID(uuid=self._transaction.table_metadata.table_uuid),
- AssertRefSnapshotId(snapshot_id=self._parent_snapshot_id,
ref="main"),
+
AssertRefSnapshotId(snapshot_id=self._transaction.table_metadata.current_snapshot_id,
ref="main"),
),
)
-class FastAppendFiles(_MergingSnapshotProducer):
+class DeleteFiles(_MergingSnapshotProducer["DeleteFiles"]):
+ """Will delete manifest entries from the current snapshot based on the
predicate.
+
+ This will produce a DELETE snapshot:
+ Data files were removed and their contents logically deleted and/or
delete
+ files were added to delete rows.
+
+ From the specification
+ """
+
+ _predicate: BooleanExpression
+
+ def __init__(
+ self,
+ operation: Operation,
+ transaction: Transaction,
+ io: FileIO,
+ commit_uuid: Optional[uuid.UUID] = None,
+ snapshot_properties: Dict[str, str] = EMPTY_DICT,
+ ):
+ super().__init__(operation, transaction, io, commit_uuid,
snapshot_properties)
+ self._predicate = AlwaysFalse()
+
+ def _commit(self) -> UpdatesAndRequirements:
+ # Only produce a commit when there is something to delete
+ if self.files_affected:
+ return super()._commit()
+ else:
+ return (), ()
+
+ def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+ schema = self._transaction.table_metadata.schema()
+ spec = self._transaction.table_metadata.specs()[spec_id]
+ project = inclusive_projection(schema, spec)
+ return project(self._predicate)
+
+ @cached_property
+ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+ return KeyDefaultDict(self._build_partition_projection)
+
+ def _build_manifest_evaluator(self, spec_id: int) ->
Callable[[ManifestFile], bool]:
+ schema = self._transaction.table_metadata.schema()
+ spec = self._transaction.table_metadata.specs()[spec_id]
+ return manifest_evaluator(spec, schema,
self.partition_filters[spec_id], case_sensitive=True)
+
+ def delete_by_predicate(self, predicate: BooleanExpression) -> None:
+ self._predicate = Or(self._predicate, predicate)
+
+ @cached_property
+ def _compute_deletes(self) -> Tuple[List[ManifestFile],
List[ManifestEntry], bool]:
+ """Computes all the delete operation and cache it when nothing changes.
+
+ Returns:
+ - List of existing manifests that are not affected by the delete
operation.
+ - The manifest-entries that are deleted based on the metadata.
+ - Flag indicating that rewrites of data-files are needed.
+ """
+ schema = self._transaction.table_metadata.schema()
+
+ def _copy_with_new_status(entry: ManifestEntry, status:
ManifestEntryStatus) -> ManifestEntry:
+ return ManifestEntry(
+ status=status,
+ snapshot_id=entry.snapshot_id,
+ data_sequence_number=entry.data_sequence_number,
+ file_sequence_number=entry.file_sequence_number,
+ data_file=entry.data_file,
+ )
+
+ manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
+ strict_metrics_evaluator = _StrictMetricsEvaluator(schema,
self._predicate, case_sensitive=True).eval
+ inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema,
self._predicate, case_sensitive=True).eval
+
+ existing_manifests = []
+ total_deleted_entries = []
+ partial_rewrites_needed = False
+ self._deleted_data_files = set()
+ if snapshot := self._transaction.table_metadata.current_snapshot():
+ for manifest_file in snapshot.manifests(io=self._io):
+ if manifest_file.content == ManifestContent.DATA:
+ if not
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+ # If the manifest isn't relevant, we can just keep it
in the manifest-list
+ existing_manifests.append(manifest_file)
+ else:
+ # It is relevant, let's check out the content
+ deleted_entries = []
+ existing_entries = []
+ for entry in
manifest_file.fetch_manifest_entry(io=self._io, discard_deleted=True):
+ if strict_metrics_evaluator(entry.data_file) ==
ROWS_MUST_MATCH:
+
deleted_entries.append(_copy_with_new_status(entry,
ManifestEntryStatus.DELETED))
+ self._deleted_data_files.add(entry.data_file)
+ elif inclusive_metrics_evaluator(entry.data_file)
== ROWS_CANNOT_MATCH:
+
existing_entries.append(_copy_with_new_status(entry,
ManifestEntryStatus.EXISTING))
+ else:
+ # Based on the metadata, it is unsure to say
if the file can be deleted
+ partial_rewrites_needed = True
+
+ if len(deleted_entries) > 0:
+ total_deleted_entries += deleted_entries
+
+ # Rewrite the manifest
+ if len(existing_entries) > 0:
+ output_file_location = _new_manifest_path(
+
location=self._transaction.table_metadata.location,
+ num=next(self._manifest_counter),
+ commit_uuid=self.commit_uuid,
+ )
+ with write_manifest(
+
format_version=self._transaction.table_metadata.format_version,
+
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
+
schema=self._transaction.table_metadata.schema(),
+
output_file=self._io.new_output(output_file_location),
+ snapshot_id=self._snapshot_id,
+ ) as writer:
+ for existing_entry in existing_entries:
+ writer.add_entry(existing_entry)
+
existing_manifests.append(writer.to_manifest_file())
Review Comment:
Yes, that's a very interesting thought. Once we get
https://github.com/apache/iceberg-python/pull/650 in this should be interesting
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]