kevinjqliu commented on code in PR #1958: URL: https://github.com/apache/iceberg-python/pull/1958#discussion_r2072491310
########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1376,28 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None: + """Delete orphaned files in the table.""" + location = self.location() + orphaned_files = self.inspect.orphaned_files(location, older_than) + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + + def _delete(file: str) -> None: + # don't error if the file doesn't exist + # still catch ctrl-c, etc. + with contextlib.suppress(Exception): + self.io.delete(file) + + if orphaned_files: + if dry_run: + logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!") + else: + executor = ExecutorFactory.get_or_create() + deletes = executor.map(_delete, orphaned_files) + # exhaust + list(deletes) + logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") + Review Comment: nit: log an else case ########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,62 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def all_known_files(self) -> dict[str, set[str]]: + """Get all the known files in the table. + + Returns: + dict of {file_type: set of file paths} for each file type. + """ + snapshots = self.tbl.snapshots() + + _all_known_files = {} + _all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) + _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} + _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} + + executor = ExecutorFactory.get_or_create() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids + ) + _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) + + return _all_known_files + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + """Get all the orphaned files in the table. + Review Comment: nit: add a sentence explaining what orphaned files mean, maybe copy/paste from https://iceberg.apache.org/docs/nightly/spark-procedures/#remove_orphan_files ########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,62 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def all_known_files(self) -> dict[str, set[str]]: + """Get all the known files in the table. + + Returns: + dict of {file_type: set of file paths} for each file type. + """ + snapshots = self.tbl.snapshots() + + _all_known_files = {} + _all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) + _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} + _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} + + executor = ExecutorFactory.get_or_create() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids + ) + _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) + + return _all_known_files + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: Review Comment: nit: should we expose this as a public function given that there's no equivalent from java/spark side? we modeled the `inspect` tables based on java's metadata tables. maybe we can change this to `_orphaned_files` for now ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1376,28 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None: Review Comment: nit: we should always provide an `older_than` arg. this protects the orphan file deletion job from deleting recently created files that is currently waiting to be committed. ########## pyiceberg/table/__init__.py: ########## @@ -1371,6 +1376,28 @@ def to_polars(self) -> pl.LazyFrame: return pl.scan_iceberg(self) + def delete_orphaned_files(self, older_than: Optional[timedelta] = timedelta(days=3), dry_run: bool = False) -> None: + """Delete orphaned files in the table.""" + location = self.location() + orphaned_files = self.inspect.orphaned_files(location, older_than) + logger.info(f"Found {len(orphaned_files)} orphaned files at {location}!") + + def _delete(file: str) -> None: + # don't error if the file doesn't exist + # still catch ctrl-c, etc. + with contextlib.suppress(Exception): + self.io.delete(file) + + if orphaned_files: + if dry_run: + logger.info(f"(Dry Run) Deleted {len(orphaned_files)} orphaned files at {location}!") + else: + executor = ExecutorFactory.get_or_create() + deletes = executor.map(_delete, orphaned_files) + # exhaust + list(deletes) + logger.info(f"Deleted {len(orphaned_files)} orphaned files at {location}!") Review Comment: nit: this might not necessary be always true, esp when _delete errors are suppressed. what we do count the number of successfully deletes here? maybe `_delete` can return `True/False` for whether the delete was successful. ########## pyiceberg/table/inspect.py: ########## @@ -657,3 +665,62 @@ def all_manifests(self) -> "pa.Table": lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots] ) return pa.concat_tables(manifests_by_snapshots) + + def all_known_files(self) -> dict[str, set[str]]: + """Get all the known files in the table. + + Returns: + dict of {file_type: set of file paths} for each file type. + """ + snapshots = self.tbl.snapshots() + + _all_known_files = {} + _all_known_files["manifests"] = set(self.all_manifests(snapshots)["path"].to_pylist()) + _all_known_files["manifest_lists"] = {snapshot.manifest_list for snapshot in snapshots} + _all_known_files["statistics"] = {statistic.statistics_path for statistic in self.tbl.metadata.statistics} + + executor = ExecutorFactory.get_or_create() + snapshot_ids = [snapshot.snapshot_id for snapshot in snapshots] + files_by_snapshots: Iterator[Set[str]] = executor.map( + lambda snapshot_id: set(self.files(snapshot_id)["file_path"].to_pylist()), snapshot_ids + ) + _all_known_files["datafiles"] = reduce(set.union, files_by_snapshots, set()) + + return _all_known_files + + def orphaned_files(self, location: str, older_than: Optional[timedelta] = timedelta(days=3)) -> Set[str]: + """Get all the orphaned files in the table. + + Args: + location: The location to check for orphaned files. + older_than: The time period to check for orphaned files. Defaults to 3 days. + + Returns: + A set of orphaned file paths. + + """ + try: + import pyarrow as pa # noqa: F401 + except ModuleNotFoundError as e: + raise ModuleNotFoundError("For deleting orphaned files PyArrow needs to be installed") from e + + from pyarrow.fs import FileSelector, FileType + + from pyiceberg.io.pyarrow import _fs_from_file_path + + all_known_files = self.all_known_files() + flat_known_files: set[str] = reduce(set.union, all_known_files.values(), set()) + + fs = _fs_from_file_path(self.tbl.io, location) + + _, _, path = _parse_location(location) + selector = FileSelector(path, recursive=True) + # filter to just files as it may return directories, and filter on time + as_of = datetime.now(timezone.utc) - older_than if older_than else None Review Comment: `older_than` should always be present, see the above comment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org