HonahX commented on code in PR #748: URL: https://github.com/apache/iceberg-python/pull/748#discussion_r1616606353
########## pyiceberg/table/__init__.py: ########## @@ -1290,6 +1291,19 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None + def latest_snapshot_before_timestamp(self, timestamp_ms: int) -> Optional[Snapshot]: + """Get the snapshot right before the given timestamp, or None if there is no matching snapshot.""" + result, prev_timestamp = None, 0 + if self.metadata.current_snapshot_id is not None: + for snapshot in self.current_ancestors(): + if snapshot and prev_timestamp < snapshot.timestamp_ms < timestamp_ms: Review Comment: ```suggestion if snapshot and prev_timestamp < snapshot.timestamp_ms <= timestamp_ms: ``` I think we could also include the given timestamp, making the method return the latest snapshot up until the given timestamp. Also, how about implementing this by iterating over `table.history()`? The `snapshot_log` field in metadata contains a list of `snapshot_id` + `timestamp` pair so we do not need to re-generate the ancestors for current snapshot. ########## pyiceberg/table/snapshots.py: ########## @@ -412,3 +417,12 @@ def _update_totals(total_property: str, added_property: str, removed_property: s def set_when_positive(properties: Dict[str, str], num: int, property_name: str) -> None: if num > 0: properties[property_name] = str(num) + + +def ancestors_of(current_snapshot: Snapshot, table_metadata: TableMetadata) -> Iterable[Snapshot]: + """Get the ancestors of and including the given snapshot.""" + if current_snapshot: + yield current_snapshot + if current_snapshot.parent_snapshot_id is not None: + if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id): + yield from ancestors_of(parent, table_metadata) Review Comment: ```suggestion if current_snapshot: yield current_snapshot if current_snapshot.parent_snapshot_id is not None: if parent := table_metadata.snapshot_by_id(current_snapshot.parent_snapshot_id): yield from ancestors_of(parent, table_metadata) ``` Otherwise it fails if the `current_snapshot` is None. ########## pyiceberg/table/__init__.py: ########## @@ -1290,6 +1291,19 @@ def snapshot_by_name(self, name: str) -> Optional[Snapshot]: return self.snapshot_by_id(ref.snapshot_id) return None + def latest_snapshot_before_timestamp(self, timestamp_ms: int) -> Optional[Snapshot]: + """Get the snapshot right before the given timestamp, or None if there is no matching snapshot.""" + result, prev_timestamp = None, 0 + if self.metadata.current_snapshot_id is not None: + for snapshot in self.current_ancestors(): + if snapshot and prev_timestamp < snapshot.timestamp_ms < timestamp_ms: + result, prev_timestamp = snapshot, snapshot.timestamp_ms + return result + + def current_ancestors(self) -> List[Optional[Snapshot]]: Review Comment: I think it should be `List[Snapshot]`. We return an empty list when the table does not have a current snapshot. BTW, I prefer to make the return type same as `ancestor_of`: `Iterable[Snapshot]` here so we generate the ancestor as needed. Is there any use-case for this API that we may prefer a list? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org