jayceslesar commented on code in PR #1958: URL: https://github.com/apache/iceberg-python/pull/1958#discussion_r2072662780
########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,62 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def all_known_files(self) -> dict[str, set[str]]: + """Get all the known files in the table. + + Returns: + dict of {file_type: set of file paths} for each file type. + """ + snapshots = self.tbl.snapshots() + + _all_known_files = {} + _all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) + _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} + _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} + + executor = ExecutorFactory.get_or_create() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids + ) + _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) + + return _all_known_files + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + """Get all the orphaned files in the table. + + Args: + location: The location to check for orphaned files. + older_than: The time period to check for orphaned files. Defaults to 3 days. + + Returns: + A set of orphaned file paths. + + """ + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = self.all_known_files() + flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set()) + + fs = _fs_from_file_path(self.tbl.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories, and filter on time + as_of = datetime.now(timezone.utc) - older_than if older_than else None Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org