Fokko commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1666868594
##########
pyiceberg/table/__init__.py:
##########
@@ -454,6 +482,74 @@ def overwrite(
for data_file in data_files:
update_snapshot.append_data_file(data_file)
+ def delete(self, delete_filter: Union[str, BooleanExpression],
snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
+ if (
+ self.table_metadata.properties.get(TableProperties.DELETE_MODE,
TableProperties.DELETE_MODE_COPY_ON_WRITE)
+ == TableProperties.DELETE_MODE_MERGE_ON_READ
+ ):
+ warnings.warn("Merge on read is not yet supported, falling back to
copy-on-write")
+
+ if isinstance(delete_filter, str):
+ delete_filter = _parse_row_filter(delete_filter)
+
+ with
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as
delete_snapshot:
+ delete_snapshot.delete_by_predicate(delete_filter)
+
+ # Check if there are any files that require an actual rewrite of a
data file
+ if delete_snapshot.rewrites_needed is True:
+ bound_delete_filter = bind(self._table.schema(), delete_filter,
case_sensitive=True)
+ preserve_row_filter =
expression_to_pyarrow(Not(bound_delete_filter))
+
+ files = self._scan(row_filter=delete_filter).plan_files()
+
+ commit_uuid = uuid.uuid4()
+ counter = itertools.count(0)
+
+ replaced_files: List[Tuple[DataFile, List[DataFile]]] = []
+ # This will load the Parquet file into memory, including:
+ # - Filter out the rows based on the delete filter
+ # - Projecting it to the current schema
+ # - Applying the positional deletes if they are there
+ # When writing
+ # - Apply the latest partition-spec
+ # - And sort order when added
+ for original_file in files:
+ df = project_table(
+ tasks=[original_file],
+ table_metadata=self._table.metadata,
+ io=self._table.io,
+ row_filter=AlwaysTrue(),
+ projected_schema=self.table_metadata.schema(),
+ )
+ filtered_df = df.filter(preserve_row_filter)
+
+ # Only rewrite if there are records being deleted
+ if len(df) != len(filtered_df):
Review Comment:
This is on a file basis, so hopefully, that fits into memory. I do think
that iterating over record batches might be an interesting thing to do here.
> Would the intersection of these two lists of files be the ones we would
have to rewrite?
We don't know for sure. If we're looking for `x = 22`, and the upper and
lower bound are `[19, 25]`, then it will match the evaluator, but we're still
not sure if there is a row that matches the predicate until we've gone through
the file.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]