rdblue commented on code in PR #6933: URL: https://github.com/apache/iceberg/pull/6933#discussion_r1117791838
########## python/pyiceberg/table/snapshots.py: ########## @@ -117,12 +125,369 @@ def manifests(self, io: FileIO) -> List[ManifestFile]: return list(read_manifest_list(file)) return [] + def added_data_files(self, io: FileIO) -> Generator[DataFile, None, None]: + for manifest in self.manifests(io): + yield from [entry.data_file for entry in manifest.fetch_manifest_entry(io)] + class MetadataLogEntry(IcebergBaseModel): metadata_file: str = Field(alias="metadata-file") timestamp_ms: int = Field(alias="timestamp-ms") class SnapshotLogEntry(IcebergBaseModel): - snapshot_id: str = Field(alias="snapshot-id") + snapshot_id: int = Field(alias="snapshot-id") timestamp_ms: int = Field(alias="timestamp-ms") + + +def is_ancestor_of(table: "Table", snapshot_id: int, ancestor_snapshot_id: int) -> bool: + """ + Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using the given lookup function. + + Args: + table: The table + snapshot_id: The snapshot id of the snapshot + ancestor_snapshot_id: The snapshot id of the possible ancestor + + Returns: + True if it is an ancestor or not + """ + snapshots = ancestors_of(snapshot_id, table.snapshot_by_id) + for snapshot in snapshots: + if snapshot.snapshot_id == ancestor_snapshot_id: + return True + return False + + +def is_parent_ancestor_of(table: "Table", snapshot_id: int, ancestor_parent_snapshot_id: int) -> bool: + """ + Returns whether some ancestor of snapshot_id has parent_id matches ancestor_parent_snapshot_id + + Args: + table: The table + snapshot_id: The snapshot id of the snapshot + ancestor_parent_snapshot_id: The snapshot id of the possible parent ancestor + + Returns: + True if there is an ancestor with a parent + """ + snapshots = ancestors_of(snapshot_id, table.snapshot_by_id) + for snapshot in snapshots: + if snapshot.parent_snapshot_id == ancestor_parent_snapshot_id: + return True + return False + + +def current_ancestors(table: "Table") -> Iterable[Snapshot]: + """ + Returns an iterable that traverses the table's snapshots from the current to the last known ancestor. + + Args: + table: The table + + Returns: + An iterable of all the ancestors + """ + if current_snapshot := table.current_snapshot(): + return ancestors_of(current_snapshot, table.snapshot_by_id) + return [] + + +def current_ancestor_ids(table: "Table") -> Iterable[int]: + """ + Return the snapshot IDs for the ancestors of the current table state. + + Ancestor IDs are ordered by commit time, descending. The first ID is + the current snapshot, followed by its parent, and so on. + + Args: + table: The table + + Returns: + An iterable of all the snapshot IDs + """ + if current_snapshot := table.current_snapshot(): + return ancestor_ids(current_snapshot, table.snapshot_by_id) + return [] + + +def oldest_ancestor(table: "Table") -> Optional[Snapshot]: + """ + Traverses the history of the table's current snapshot and finds the oldest Snapshot. + + Args: + table: The table + + Returns: + None if there is no current snapshot in the table, else the oldest Snapshot. + """ + oldest_snapshot: Optional[Snapshot] = None + + for snapshot in current_ancestors(table): + oldest_snapshot = snapshot + + return oldest_snapshot + + +def oldest_ancestor_of(table: "Table", snapshot_id: int) -> Optional[Snapshot]: + """ + Traverses the history and finds the oldest ancestor of the specified snapshot. + + Oldest ancestor is defined as the ancestor snapshot whose parent is null or has been + expired. If the specified snapshot has no parent or parent has been expired, the specified + snapshot itself is returned. + + Args: + table: The table + snapshot_id: the ID of the snapshot to find the oldest ancestor + + Returns: + None if there is no current snapshot in the table, else the oldest Snapshot. + """ + oldest_snapshot: Optional[Snapshot] = None + + for snapshot in ancestors_of(snapshot_id, table.snapshot_by_id): + oldest_snapshot = snapshot + + return oldest_snapshot + + +def oldest_ancestor_after(table: "Table", timestamp_ms: int) -> Snapshot: + """ + Looks up the snapshot after a given point in time + + Args: + table: The table + timestamp_ms: The timestamp in millis since the Unix epoch + + Returns: + The snapshot after the given point in time + + Raises: + ValueError: When there is no snapshot older than the given time + """ + if last_snapshot := table.current_snapshot(): + for snapshot in current_ancestors(table): + if snapshot.timestamp_ms < timestamp_ms: + return last_snapshot + elif snapshot.timestamp_ms == timestamp_ms: + return snapshot + + last_snapshot = snapshot + + if last_snapshot is not None and last_snapshot.parent_snapshot_id is None: + return last_snapshot + + raise ValueError(f"Cannot find snapshot older than: {timestamp_ms}") + + +def snapshots_ids_between(table: "Table", from_snapshot_id: int, to_snapshot_id: int) -> Iterable[int]: + """ + Returns list of snapshot ids in the range - (fromSnapshotId, toSnapshotId] + + This method assumes that fromSnapshotId is an ancestor of toSnapshotId. + + Args: + table: The table + from_snapshot_id: The starting snapshot ID + to_snapshot_id: The ending snapshot ID + + Returns: + The list of snapshot IDs that are between the given snapshot IDs + """ + + def lookup(snapshot_id: int) -> Optional[Snapshot]: + return table.snapshot_by_id(snapshot_id) if snapshot_id != from_snapshot_id else None + + return ancestor_ids(table.snapshot_by_id(snapshot_id=to_snapshot_id), lookup) + + +def ancestor_ids(latest_snapshot: Union[int, Snapshot], lookup: Callable[[int], Optional[Snapshot]]) -> Iterable[int]: + """ + Returns list of the snapshot IDs of the ancestors + + Args: + latest_snapshot: The snapshot where to start from + lookup: Lookup function to get the snapshot for the snapshot ID + + Returns: + The list of snapshot IDs that are ancestor of the given snapshot + """ + + def get_id(snapshot: Snapshot) -> int: + return snapshot.snapshot_id + + return map(get_id, ancestors_of(latest_snapshot, lookup)) + + +def ancestors_of(latest_snapshot: Union[int, Snapshot], lookup: Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]: + """ + Returns list of snapshot that are ancestor of the given snapshot + + Args: + latest_snapshot: The snapshot where to start from + lookup: Lookup function to get the snapshot for the snapshot ID + + Returns: + The list of snapshots that are ancestor of the given snapshot + """ + if isinstance(latest_snapshot, int): + start = lookup(latest_snapshot) + if start is None: + raise ValueError(f"Cannot find snapshot: {latest_snapshot}") + else: + start = latest_snapshot + + def snapshot_generator() -> Generator[Snapshot, None, None]: Review Comment: Why is this an inline method rather than yielding from `ancestors_of`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org