Fokko commented on code in PR #1958:
URL: https://github.com/apache/iceberg-python/pull/1958#discussion_r2071124751


##########
pyiceberg/table/__init__.py:
##########
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame:
 
         return pl.scan_iceberg(self)
 
+    def delete_orphaned_files(self) -> None:
+        """Delete orphaned files in the table."""
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        location = self.location()

Review Comment:
   Nit, should we move this variable assignment downward, where we start using 
it?



##########
pyiceberg/table/__init__.py:
##########
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame:
 
         return pl.scan_iceberg(self)
 
+    def delete_orphaned_files(self) -> None:
+        """Delete orphaned files in the table."""
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        location = self.location()
+
+        all_known_files = []
+        snapshots = self.snapshots()
+        snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
+        all_manifests_table = self.inspect.all_manifests(snapshots)
+        all_known_files.extend(all_manifests_table["path"].to_pylist())
+
+        executor = ExecutorFactory.get_or_create()
+        files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda 
snapshot_id: self.inspect.files(snapshot_id), snapshot_ids)
+        
all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist())

Review Comment:
   How about just returning a set of paths? This way we can nicely union all of 
them into a set:
   
   ```suggestion
           files_by_snapshots: Iterator[Set[str]] = executor.map(lambda 
snapshot_id: set(self.inspect.files(snapshot_id), 
snapshot_ids)["file_path"].to_pylist())
           datafile_paths = reduce(set.union, files_by_snapshots)
           all_known_files.extend(datafile_paths)
   ```
   
   There will probably be quite a bit of overlap between the snapshots in terms 
of data files



##########
pyiceberg/table/__init__.py:
##########
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame:
 
         return pl.scan_iceberg(self)
 
+    def delete_orphaned_files(self) -> None:
+        """Delete orphaned files in the table."""
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        location = self.location()
+
+        all_known_files = []
+        snapshots = self.snapshots()
+        snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
+        all_manifests_table = self.inspect.all_manifests(snapshots)
+        all_known_files.extend(all_manifests_table["path"].to_pylist())
+
+        executor = ExecutorFactory.get_or_create()
+        files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda 
snapshot_id: self.inspect.files(snapshot_id), snapshot_ids)
+        
all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist())
+
+        fs = _fs_from_file_path(self.io, location)
+
+        _, _, path = _parse_location(location)
+        selector = FileSelector(path, recursive=True)
+        # filter to just files as it may return directories
+        all_files = [f.path for f in fs.get_file_info(selector) if f.type == 
FileType.File]
+
+        orphaned_files = set(all_files).difference(set(all_known_files))
+        logger.info(f"Found {len(orphaned_files)} orphaned files at 
{location}!")
+
+        if orphaned_files:
+            deletes = executor.map(self.io.delete, orphaned_files)

Review Comment:
   This looks fine, we can just re-use the executor 👍 



##########
pyiceberg/table/__init__.py:
##########
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame:
 
         return pl.scan_iceberg(self)
 
+    def delete_orphaned_files(self) -> None:
+        """Delete orphaned files in the table."""
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        location = self.location()
+
+        all_known_files = []

Review Comment:
   Why not make this a set right away?
   ```suggestion
           all_known_files = set()
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame:
 
         return pl.scan_iceberg(self)
 
+    def delete_orphaned_files(self) -> None:
+        """Delete orphaned files in the table."""
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        location = self.location()
+
+        all_known_files = []
+        snapshots = self.snapshots()
+        snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
+        all_manifests_table = self.inspect.all_manifests(snapshots)
+        all_known_files.extend(all_manifests_table["path"].to_pylist())

Review Comment:
   I think it would be good to have a bit meaningful variable names, so the 
reader knows what's being added:
   ```suggestion
           manifests_paths = 
self.inspect.all_manifests(snapshots)["path"].to_pylist()
           all_known_files.extend(manifests_paths)
   ```



##########
pyiceberg/table/__init__.py:
##########
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame:
 
         return pl.scan_iceberg(self)
 
+    def delete_orphaned_files(self) -> None:
+        """Delete orphaned files in the table."""
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        location = self.location()
+
+        all_known_files = []
+        snapshots = self.snapshots()
+        snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots]
+        all_manifests_table = self.inspect.all_manifests(snapshots)
+        all_known_files.extend(all_manifests_table["path"].to_pylist())
+
+        executor = ExecutorFactory.get_or_create()
+        files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda 
snapshot_id: self.inspect.files(snapshot_id), snapshot_ids)
+        
all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist())
+
+        fs = _fs_from_file_path(self.io, location)
+
+        _, _, path = _parse_location(location)
+        selector = FileSelector(path, recursive=True)
+        # filter to just files as it may return directories
+        all_files = [f.path for f in fs.get_file_info(selector) if f.type == 
FileType.File]
+
+        orphaned_files = set(all_files).difference(set(all_known_files))
+        logger.info(f"Found {len(orphaned_files)} orphaned files at 
{location}!")
+
+        if orphaned_files:
+            deletes = executor.map(self.io.delete, orphaned_files)

Review Comment:
   When one of the deletes would throw an error (maybe some other process had 
already cleaned up the file), then the whole execution would terminate. Should 
we add a `try` block to swallow any related exception? Would be good to also 
add a test for this 👍 



##########
pyiceberg/table/__init__.py:
##########
@@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame:
 
         return pl.scan_iceberg(self)
 
+    def delete_orphaned_files(self) -> None:

Review Comment:
   I think it would be good to add some options that we also have [on the Java 
side](https://iceberg.apache.org/docs/nightly/spark-procedures/#remove_orphan_files),
 at a minimum:
   
   - `older_than`: Remove orphan files created before this timestamp (Defaults 
to 3 days). It can be that some process is writing to the table, and has some 
files staged to be added to the metadata tree. If we don't take this into 
account, it might be that these files are removed in the period between writing 
and committing.
   - `dry_run`: When true, don't actually remove files (defaults to false). I 
think it would be nice to return a set of the number of files removed:
   ```suggestion
       def delete_orphaned_files(self) -> Set[str]:
   ```



##########
pyiceberg/table/inspect.py:
##########
@@ -645,10 +645,10 @@ def data_files(self, snapshot_id: Optional[int] = None) 
-> "pa.Table":
     def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table":
         return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, 
DataFileContent.EQUALITY_DELETES})
 
-    def all_manifests(self) -> "pa.Table":
+    def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> 
"pa.Table":
         import pyarrow as pa
 
-        snapshots = self.tbl.snapshots()
+        snapshots = snapshots or self.tbl.snapshots()
         if not snapshots:

Review Comment:
   Let's save that for another PR. I don't think we can just change this API 
since folks might be using this.. We could allow for an `Union[list[snapshot], 
iterable[int]]`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to