Fokko commented on code in PR #569:
URL: https://github.com/apache/iceberg-python/pull/569#discussion_r1594055639


##########
pyiceberg/table/__init__.py:
##########
@@ -443,6 +468,54 @@ def overwrite(
                 for data_file in data_files:
                     update_snapshot.append_data_file(data_file)
 
+    def delete(self, delete_filter: BooleanExpression, snapshot_properties: 
Dict[str, str] = EMPTY_DICT) -> None:
+        if (
+            self.table_metadata.properties.get(TableProperties.DELETE_MODE, 
TableProperties.DELETE_MODE_COPY_ON_WRITE)
+            == TableProperties.DELETE_MODE_MERGE_ON_READ
+        ):
+            raise NotImplementedError("Merge on read is not yet supported")
+
+        with 
self.update_snapshot(snapshot_properties=snapshot_properties).delete() as 
delete_snapshot:
+            delete_snapshot.delete_by_predicate(delete_filter)
+
+        # Check if there are any files that require an actual rewrite of a 
data file
+        if delete_snapshot.rewrites_needed is True:
+            # When we want to filter out certain rows, we want to invert the 
expression
+            # delete id = 22 means that we want to look for that value, and 
then remove
+            # if from the Parquet file
+            delete_row_filter = Not(delete_filter)
+            with 
self.update_snapshot(snapshot_properties=snapshot_properties).overwrite() as 
overwrite_snapshot:
+                # Potential optimization is where we check if the files 
actually contain relevant data.
+                files = self._scan(row_filter=delete_filter).plan_files()
+
+                counter = itertools.count(0)
+
+                # This will load the Parquet file into memory, including:
+                #   - Filter out the rows based on the delete filter
+                #   - Projecting it to the current schema
+                #   - Applying the positional deletes if they are there
+                # When writing
+                #   - Apply the latest partition-spec
+                #   - And sort order when added
+                for original_file in files:
+                    df = project_table(
+                        tasks=[original_file],
+                        table_metadata=self._table.metadata,
+                        io=self._table.io,
+                        row_filter=delete_row_filter,
+                        projected_schema=self.table_metadata.schema(),
+                    )
+                    for data_file in _dataframe_to_data_files(
+                        io=self._table.io,
+                        df=df,
+                        table_metadata=self._table.metadata,
+                        write_uuid=overwrite_snapshot.commit_uuid,

Review Comment:
   I would love to do that in a separate PR. Ideally, files are dropped as a 
whole, but we can experiment with parallizing and combining files.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to