smaheshwar-pltr commented on code in PR #1958: URL: https://github.com/apache/iceberg-python/pull/1958#discussion_r2072429901
########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = set() + snapshots = self.tbl.snapshots() + manifests_paths = self.all_manifests(snapshots)["path"].to_pylist() + all_known_files.update(manifests_paths) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()) + ) + datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set()) + all_known_files.update(datafile_paths) + + fs = _fs_from_file_path(self.tbl.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories, and filter on time + as_of = datetime.now(timezone.utc) - older_than if older_than else None + all_files = [f for f in fs.get_file_info(selector) if f.type == FileType.File and (as_of is None or (f.mtime < as_of))] + + orphaned_files = set(all_files).difference(all_known_files) Review Comment: I think we need to be careful here. `all_files` is a list of these `FileInfo` objects I think but `all_known_files` is a set of `str`s. So the set difference here won't do anything because a `FileInfo` object won't be in a `str` set. ########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = set() Review Comment: We also want to have manifest list files here (I don't see them now). Otherwise, they'll be removed by the procedure and the table will be "corrupted". (Related: when looking at Java tests, I noticed https://github.com/apache/iceberg/pull/12957) ########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = set() Review Comment: The same goes for the current metadata JSON file, and I think to match Java behaviour we want to include all files in the metadata log of the current metadata file too. I think there are more files we might be missing - I think tests would be nice to make sure we're not missing something! (Perhaps inspiration can be taken from the [Java ones](https://github.com/apache/iceberg/blob/main/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRemoveOrphanFilesProcedure.java)) ########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = set() Review Comment: Part of me wonders whether we could expose this as a method: a public, documented `inspect` utility that returns all files referenced by a table. Curious what others think about whether this would be useful, I'm not fully convinced myself. (We could also then restructure orphaned file detection to use that) ########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,37 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = set() + snapshots = self.tbl.snapshots() + manifests_paths = self.all_manifests(snapshots)["path"].to_pylist() + all_known_files.update(manifests_paths) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()) + ) + datafile_paths: set[str] = reduce(set.union, files_by_snapshots, set()) Review Comment: Won't this always be empty? I don't see any `Iterable` submitted to the executor pool above -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org