smaheshwar-pltr commented on code in PR #1958:
URL: https://github.com/apache/iceberg-python/pull/1958#discussion_r2072429901


##########
pyiceberg/table/inspect.py:
##########
@@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table":
             lambda args: self._generate_manifests_table(*args), [(snapshot, 
True) for snapshot in snapshots]
         )
         return pa.concat_tables(manifests_by_snapshots)
+
+    def orphaned_files(self, location: str, older_than: Optional[timedelta] = 
timedelta(days=3)) -> Set[str]:
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        all_known_files = set()
+        snapshots = self.tbl.snapshots()
+        manifests_paths = self.all_manifests(snapshots)["path"].to_pylist()
+        all_known_files.update(manifests_paths)
+
+        executor = ExecutorFactory.get_or_create()
+        files_by_snapshots: Iterator[Set[str]] = executor.map(
+            lambda snapshot_id: 
set(self.files(snapshot_id)["file_path"].to_pylist())
+        )
+        datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set())
+        all_known_files.update(datafile_paths)
+
+        fs = _fs_from_file_path(self.tbl.io, location)
+
+        _, _, path = _parse_location(location)
+        selector = FileSelector(path, recursive=True)
+        # filter to just files as it may return directories, and filter on time
+        as_of = datetime.now(timezone.utc) - older_than if older_than else None
+        all_files = [f for f in fs.get_file_info(selector) if f.type == 
FileType.File and (as_of is None or (f.mtime < as_of))]
+
+        orphaned_files = set(all_files).difference(all_known_files)

Review Comment:
   I think we need to be careful here. `all_files` is a list of these 
`FileInfo` objects I think but `all_known_files` is a set of `str`s. So the set 
difference here won't do anything because a `FileInfo` object won't be in a 
`str` set.



##########
pyiceberg/table/inspect.py:
##########
@@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table":
             lambda args: self._generate_manifests_table(*args), [(snapshot, 
True) for snapshot in snapshots]
         )
         return pa.concat_tables(manifests_by_snapshots)
+
+    def orphaned_files(self, location: str, older_than: Optional[timedelta] = 
timedelta(days=3)) -> Set[str]:
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        all_known_files = set()

Review Comment:
   We also want to have manifest list files here (I don't see them now). 
Otherwise, they'll be removed by the procedure and the table will be 
"corrupted".
   
   (Related: when looking at Java tests, I noticed 
https://github.com/apache/iceberg/pull/12957)



##########
pyiceberg/table/inspect.py:
##########
@@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table":
             lambda args: self._generate_manifests_table(*args), [(snapshot, 
True) for snapshot in snapshots]
         )
         return pa.concat_tables(manifests_by_snapshots)
+
+    def orphaned_files(self, location: str, older_than: Optional[timedelta] = 
timedelta(days=3)) -> Set[str]:
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        all_known_files = set()

Review Comment:
   The same goes for the current metadata JSON file, and I think to match Java 
behaviour we want to include all files in the metadata log of the current 
metadata file too. 
   
   I think there are more files we might be missing - I think tests would be 
nice to make sure we're not missing something! (Perhaps inspiration can be 
taken from the [Java 
ones](https://github.com/apache/iceberg/blob/main/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java))



##########
pyiceberg/table/inspect.py:
##########
@@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table":
             lambda args: self._generate_manifests_table(*args), [(snapshot, 
True) for snapshot in snapshots]
         )
         return pa.concat_tables(manifests_by_snapshots)
+
+    def orphaned_files(self, location: str, older_than: Optional[timedelta] = 
timedelta(days=3)) -> Set[str]:
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        all_known_files = set()

Review Comment:
   Part of me wonders whether we could expose this as a method: a public, 
documented `inspect` utility that returns all files referenced by a table. 
Curious what others think about whether this would be useful, I'm not fully 
convinced myself. (We could also then restructure orphaned file detection to 
use that)



##########
pyiceberg/table/inspect.py:
##########
@@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table":
             lambda args: self._generate_manifests_table(*args), [(snapshot, 
True) for snapshot in snapshots]
         )
         return pa.concat_tables(manifests_by_snapshots)
+
+    def orphaned_files(self, location: str, older_than: Optional[timedelta] = 
timedelta(days=3)) -> Set[str]:
+        try:
+            import pyarrow as pa  # noqa: F401
+        except ModuleNotFoundError as e:
+            raise ModuleNotFoundError("For deleting orphaned files PyArrow 
needs to be installed") from e
+
+        from pyarrow.fs import FileSelector, FileType
+
+        from pyiceberg.io.pyarrow import _fs_from_file_path
+
+        all_known_files = set()
+        snapshots = self.tbl.snapshots()
+        manifests_paths = self.all_manifests(snapshots)["path"].to_pylist()
+        all_known_files.update(manifests_paths)
+
+        executor = ExecutorFactory.get_or_create()
+        files_by_snapshots: Iterator[Set[str]] = executor.map(
+            lambda snapshot_id: 
set(self.files(snapshot_id)["file_path"].to_pylist())
+        )
+        datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set())

Review Comment:
   Won't this always be empty? I don't see any `Iterable` submitted to the 
executor pool above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to