Fokko commented on code in PR #1958: URL: https://github.com/apache/iceberg-python/pull/1958#discussion_r2071124751
########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() Review Comment: Nit, should we move this variable assignment downward, where we start using it? ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() + + all_known_files = [] + snapshots = self.snapshots() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + all_manifests_table = self.inspect.all_manifests(snapshots) + all_known_files.extend(all_manifests_table["path"].to_pylist()) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) + all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) Review Comment: How about just returning a set of paths? This way we can nicely union all of them into a set: ```suggestion files_by_snapshots: Iterator[Set[str]] = executor.map(lambda snapshot_id: set(self.inspect.files(snapshot_id), snapshot_ids)["file_path"].to_pylist()) datafile_paths = reduce(set.union, files_by_snapshots) all_known_files.extend(datafile_paths) ``` There will probably be quite a bit of overlap between the snapshots in terms of data files ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() + + all_known_files = [] + snapshots = self.snapshots() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + all_manifests_table = self.inspect.all_manifests(snapshots) + all_known_files.extend(all_manifests_table["path"].to_pylist()) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) + all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) + + fs = _fs_from_file_path(self.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories + all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File] + + orphaned_files = set(all_files).difference(set(all_known_files)) + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + + if orphaned_files: + deletes = executor.map(self.io.delete, orphaned_files) Review Comment: This looks fine, we can just re-use the executor 👍 ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() + + all_known_files = [] Review Comment: Why not make this a set right away? ```suggestion all_known_files = set() ``` ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() + + all_known_files = [] + snapshots = self.snapshots() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + all_manifests_table = self.inspect.all_manifests(snapshots) + all_known_files.extend(all_manifests_table["path"].to_pylist()) Review Comment: I think it would be good to have a bit meaningful variable names, so the reader knows what's being added: ```suggestion manifests_paths = self.inspect.all_manifests(snapshots)["path"].to_pylist() all_known_files.extend(manifests_paths) ``` ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: + """Delete orphaned files in the table.""" + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + location = self.location() + + all_known_files = [] + snapshots = self.snapshots() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + all_manifests_table = self.inspect.all_manifests(snapshots) + all_known_files.extend(all_manifests_table["path"].to_pylist()) + + executor = ExecutorFactory.get_or_create() + files_by_snapshots: Iterator["pa.Table"] = executor.map(lambda snapshot_id: self.inspect.files(snapshot_id), snapshot_ids) + all_known_files.extend(pa.concat_tables(files_by_snapshots)["file_path"].to_pylist()) + + fs = _fs_from_file_path(self.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories + all_files = [f.path for f in fs.get_file_info(selector) if f.type == FileType.File] + + orphaned_files = set(all_files).difference(set(all_known_files)) + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + + if orphaned_files: + deletes = executor.map(self.io.delete, orphaned_files) Review Comment: When one of the deletes would throw an error (maybe some other process had already cleaned up the file), then the whole execution would terminate. Should we add a `try` block to swallow any related exception? Would be good to also add a test for this 👍 ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1375,45 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self) -> None: Review Comment: I think it would be good to add some options that we also have [on the Java side](https://iceberg.apache.org/docs/nightly/spark-procedures/#remove_orphan_files), at a minimum: - `older_than`: Remove orphan files created before this timestamp (Defaults to 3 days). It can be that some process is writing to the table, and has some files staged to be added to the metadata tree. If we don't take this into account, it might be that these files are removed in the period between writing and committing. - `dry_run`: When true, don't actually remove files (defaults to false). I think it would be nice to return a set of the number of files removed: ```suggestion def delete_orphaned_files(self) -> Set[str]: ``` ########## pyiceberg/table/inspect.py: ########## @@ -645,10 +645,10 @@ def data_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": def delete_files(self, snapshot_id: Optional[int] = None) -> "pa.Table": return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES}) - def all_manifests(self) -> "pa.Table": + def all_manifests(self, snapshots: Optional[list[Snapshot]] = None) -> "pa.Table": import pyarrow as pa - snapshots = self.tbl.snapshots() + snapshots = snapshots or self.tbl.snapshots() if not snapshots: Review Comment: Let's save that for another PR. I don't think we can just change this API since folks might be using this.. We could allow for an `Union[list[snapshot], iterable[int]]`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org