jayceslesar commented on code in PR #1958: URL: https://github.com/apache/iceberg-python/pull/1958#discussion_r2072162570
########## pyiceberg/table/inspect.py: ########## @@ -645,10 +645,10 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_manifests(self) -> "pa.Table": + def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() + snapshots = snapshots or self.tbl.snapshots() if not snapshots: Review Comment: Modified this, let me know what you think ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() + + all_known_files = [] + snapshots = self.snapshots() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + all_manifests_table = self.inspect.all_manifests(snapshots) + all_known_files.extend(all_manifests_table["path"].to_pylist()) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) + all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) + + fs = _fs_from_file_path(self.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories + all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File] + + orphaned_files = set(all_files).difference(set(all_known_files)) + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + + if orphaned_files: + deletes = executor.map(self.io.delete, orphaned_files) Review Comment: thas has been done and a test was added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org