Fokko commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1553580296


##########
pyiceberg/table/__init__.py:
##########
@@ -2726,6 +2731,112 @@ def _commit(self) -> UpdatesAndRequirements:
         )
 
 
+class DeleteFiles(_MergingSnapshotProducer):
+    _predicate: BooleanExpression
+
+    def __init__(
+        self,
+        operation: Operation,
+        transaction: Transaction,
+        io: FileIO,
+        commit_uuid: Optional[uuid.UUID] = None,
+        snapshot_properties: Dict[str, str] = EMPTY_DICT,
+    ):
+        super().__init__(operation, transaction, io, commit_uuid, 
snapshot_properties)
+        self._predicate = AlwaysFalse()
+
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        project = inclusive_projection(schema, spec)
+        return project(self._predicate)
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        return manifest_evaluator(spec, schema, 
self.partition_filters[spec_id], case_sensitive=True)
+
+    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], 
bool]:
+        schema = self._transaction.table_metadata.schema()
+        spec = self._transaction.table_metadata.specs()[spec_id]
+        partition_type = spec.partition_type(schema)
+        partition_schema = Schema(*partition_type.fields)
+        partition_expr = self.partition_filters[spec_id]
+
+        return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, case_sensitive=True)(data_file.partition)
+
+    def delete(self, predicate: BooleanExpression) -> None:
+        self._predicate = Or(self._predicate, predicate)
+
+    @cached_property
+    def _compute_deletes(self) -> Tuple[List[ManifestFile], 
List[ManifestEntry]]:
+        schema = self._transaction.table_metadata.schema()
+
+        def _copy_with_new_status(entry: ManifestEntry, status: 
ManifestEntryStatus) -> ManifestEntry:
+            return ManifestEntry(
+                status=status,
+                snapshot_id=entry.snapshot_id,
+                data_sequence_number=entry.data_sequence_number,
+                file_sequence_number=entry.file_sequence_number,
+                data_file=entry.data_file,
+            )
+
+        manifest_evaluators: Dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
+        strict_metrics_evaluator = _StrictMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+        inclusive_metrics_evaluator = _InclusiveMetricsEvaluator(schema, 
self._predicate, case_sensitive=True).eval
+
+        existing_manifests = []
+        total_deleted_entries = []
+        if snapshot := self._transaction.table_metadata.current_snapshot():
+            for num, manifest_file in 
enumerate(snapshot.manifests(io=self._io)):
+                if not 
manifest_evaluators[manifest_file.partition_spec_id](manifest_file):
+                    # If the manifest isn't relevant, we can just keep it in 
the manifest-list
+                    existing_manifests.append(manifest_file)
+                else:
+                    # It is relevant, let's check out the content
+                    deleted_entries = []
+                    existing_entries = []
+                    for entry in 
manifest_file.fetch_manifest_entry(io=self._io):
+                        if strict_metrics_evaluator(entry.data_file) == 
ROWS_MUST_MATCH:
+                            
deleted_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.DELETED))
+                        elif inclusive_metrics_evaluator(entry.data_file) == 
ROWS_CANNOT_MATCH:
+                            
existing_entries.append(_copy_with_new_status(entry, 
ManifestEntryStatus.EXISTING))
+                        else:
+                            raise ValueError("Deletes do not support rewrites 
of data files")
+
+                    if len(deleted_entries) > 0:
+                        total_deleted_entries += deleted_entries
+
+                        # Rewrite the manifest
+                        if len(existing_entries) > 0:
+                            output_file_location = _new_manifest_path(
+                                
location=self._transaction.table_metadata.location, num=num, 
commit_uuid=self.commit_uuid
+                            )
+                            with write_manifest(
+                                
format_version=self._transaction.table_metadata.format_version,
+                                
spec=self._transaction.table_metadata.specs()[manifest_file.partition_spec_id],
+                                
schema=self._transaction.table_metadata.schema(),
+                                
output_file=self._io.new_output(output_file_location),
+                                snapshot_id=self._snapshot_id,
+                            ) as writer:
+                                for existing_entry in existing_entries:
+                                    writer.add_entry(existing_entry)

Review Comment:
   Yes, that's correct! Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to