rdblue commented on code in PR #6933:
URL: https://github.com/apache/iceberg/pull/6933#discussion_r1117790748


##########
python/pyiceberg/table/snapshots.py:
##########
@@ -117,12 +125,369 @@ def manifests(self, io: FileIO) -> List[ManifestFile]:
             return list(read_manifest_list(file))
         return []
 
+    def added_data_files(self, io: FileIO) -> Generator[DataFile, None, None]:
+        for manifest in self.manifests(io):
+            yield from [entry.data_file for entry in 
manifest.fetch_manifest_entry(io)]
+
 
 class MetadataLogEntry(IcebergBaseModel):
     metadata_file: str = Field(alias="metadata-file")
     timestamp_ms: int = Field(alias="timestamp-ms")
 
 
 class SnapshotLogEntry(IcebergBaseModel):
-    snapshot_id: str = Field(alias="snapshot-id")
+    snapshot_id: int = Field(alias="snapshot-id")
     timestamp_ms: int = Field(alias="timestamp-ms")
+
+
+def is_ancestor_of(table: "Table", snapshot_id: int, ancestor_snapshot_id: 
int) -> bool:
+    """
+    Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using 
the given lookup function.
+
+    Args:
+        table: The table
+        snapshot_id: The snapshot id of the snapshot
+        ancestor_snapshot_id: The snapshot id of the possible ancestor
+
+    Returns:
+        True if it is an ancestor or not
+    """
+    snapshots = ancestors_of(snapshot_id, table.snapshot_by_id)
+    for snapshot in snapshots:
+        if snapshot.snapshot_id == ancestor_snapshot_id:
+            return True
+    return False
+
+
+def is_parent_ancestor_of(table: "Table", snapshot_id: int, 
ancestor_parent_snapshot_id: int) -> bool:
+    """
+    Returns whether some ancestor of snapshot_id has parent_id matches 
ancestor_parent_snapshot_id
+
+    Args:
+        table: The table
+        snapshot_id: The snapshot id of the snapshot
+        ancestor_parent_snapshot_id: The snapshot id of the possible parent 
ancestor
+
+    Returns:
+        True if there is an ancestor with a parent
+    """
+    snapshots = ancestors_of(snapshot_id, table.snapshot_by_id)
+    for snapshot in snapshots:
+        if snapshot.parent_snapshot_id == ancestor_parent_snapshot_id:
+            return True
+    return False
+
+
+def current_ancestors(table: "Table") -> Iterable[Snapshot]:
+    """
+    Returns an iterable that traverses the table's snapshots from the current 
to the last known ancestor.
+
+    Args:
+        table: The table
+
+    Returns:
+        An iterable of all the ancestors
+    """
+    if current_snapshot := table.current_snapshot():
+        return ancestors_of(current_snapshot, table.snapshot_by_id)
+    return []
+
+
+def current_ancestor_ids(table: "Table") -> Iterable[int]:
+    """
+    Return the snapshot IDs for the ancestors of the current table state.
+
+    Ancestor IDs are ordered by commit time, descending. The first ID is
+    the current snapshot, followed by its parent, and so on.
+
+    Args:
+        table: The table
+
+    Returns:
+        An iterable of all the snapshot IDs
+    """
+    if current_snapshot := table.current_snapshot():
+        return ancestor_ids(current_snapshot, table.snapshot_by_id)
+    return []
+
+
+def oldest_ancestor(table: "Table") -> Optional[Snapshot]:
+    """
+    Traverses the history of the table's current snapshot and finds the oldest 
Snapshot.
+
+    Args:
+        table: The table
+
+    Returns:
+        None if there is no current snapshot in the table, else the oldest 
Snapshot.
+    """
+    oldest_snapshot: Optional[Snapshot] = None
+
+    for snapshot in current_ancestors(table):
+        oldest_snapshot = snapshot
+
+    return oldest_snapshot
+
+
+def oldest_ancestor_of(table: "Table", snapshot_id: int) -> Optional[Snapshot]:
+    """
+    Traverses the history and finds the oldest ancestor of the specified 
snapshot.
+
+    Oldest ancestor is defined as the ancestor snapshot whose parent is null 
or has been
+    expired. If the specified snapshot has no parent or parent has been 
expired, the specified
+    snapshot itself is returned.
+
+    Args:
+        table: The table
+        snapshot_id: the ID of the snapshot to find the oldest ancestor
+
+    Returns:
+        None if there is no current snapshot in the table, else the oldest 
Snapshot.
+    """
+    oldest_snapshot: Optional[Snapshot] = None
+
+    for snapshot in ancestors_of(snapshot_id, table.snapshot_by_id):
+        oldest_snapshot = snapshot
+
+    return oldest_snapshot
+
+
+def oldest_ancestor_after(table: "Table", timestamp_ms: int) -> Snapshot:
+    """
+    Looks up the snapshot after a given point in time
+
+    Args:
+        table: The table
+        timestamp_ms: The timestamp in millis since the Unix epoch
+
+    Returns:
+        The snapshot after the given point in time
+
+    Raises:
+        ValueError: When there is no snapshot older than the given time
+    """
+    if last_snapshot := table.current_snapshot():
+        for snapshot in current_ancestors(table):
+            if snapshot.timestamp_ms < timestamp_ms:
+                return last_snapshot
+            elif snapshot.timestamp_ms == timestamp_ms:
+                return snapshot
+
+            last_snapshot = snapshot
+
+        if last_snapshot is not None and last_snapshot.parent_snapshot_id is 
None:
+            return last_snapshot
+
+    raise ValueError(f"Cannot find snapshot older than: {timestamp_ms}")
+
+
+def snapshots_ids_between(table: "Table", from_snapshot_id: int, 
to_snapshot_id: int) -> Iterable[int]:
+    """
+    Returns list of snapshot ids in the range - (fromSnapshotId, toSnapshotId]
+
+    This method assumes that fromSnapshotId is an ancestor of toSnapshotId.
+
+    Args:
+        table: The table
+        from_snapshot_id: The starting snapshot ID
+        to_snapshot_id: The ending snapshot ID
+
+    Returns:
+        The list of snapshot IDs that are between the given snapshot IDs
+    """
+
+    def lookup(snapshot_id: int) -> Optional[Snapshot]:
+        return table.snapshot_by_id(snapshot_id) if snapshot_id != 
from_snapshot_id else None
+
+    return ancestor_ids(table.snapshot_by_id(snapshot_id=to_snapshot_id), 
lookup)
+
+
+def ancestor_ids(latest_snapshot: Union[int, Snapshot], lookup: 
Callable[[int], Optional[Snapshot]]) -> Iterable[int]:
+    """
+    Returns list of the snapshot IDs of the ancestors
+
+    Args:
+        latest_snapshot: The snapshot where to start from
+        lookup: Lookup function to get the snapshot for the snapshot ID
+
+    Returns:
+        The list of snapshot IDs that are ancestor of the given snapshot
+    """
+
+    def get_id(snapshot: Snapshot) -> int:
+        return snapshot.snapshot_id
+
+    return map(get_id, ancestors_of(latest_snapshot, lookup))
+
+
+def ancestors_of(latest_snapshot: Union[int, Snapshot], lookup: 
Callable[[int], Optional[Snapshot]]) -> Iterable[Snapshot]:
+    """
+    Returns list of snapshot that are ancestor of the given snapshot
+
+    Args:
+        latest_snapshot: The snapshot where to start from
+        lookup: Lookup function to get the snapshot for the snapshot ID
+
+    Returns:
+        The list of snapshots that are ancestor of the given snapshot
+    """
+    if isinstance(latest_snapshot, int):
+        start = lookup(latest_snapshot)
+        if start is None:
+            raise ValueError(f"Cannot find snapshot: {latest_snapshot}")

Review Comment:
   I think `lookup` will probably be `table.snapshot_by_id` and this PR changes 
that method to throw `ValueError` instead of returning `None`. Doesn't that 
conflict? Why change the behavior of `snapshot_by_id`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to