hililiwei commented on code in PR #533: URL: https://github.com/apache/iceberg-python/pull/533#discussion_r1626799873
########## pyiceberg/table/__init__.py: ########## @@ -1754,6 +1788,134 @@ def to_arrow(self) -> pa.Table: def to_pandas(self, **kwargs: Any) -> pd.DataFrame: return self.to_arrow().to_pandas(**kwargs) + def use_ref(self: S, name: str) -> S: + if self.snapshot_id: # type: ignore + raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore + if snapshot := self.table_metadata.snapshot_by_name(name): + return self.update(snapshot_id=snapshot.snapshot_id) + + raise ValueError(f"Cannot scan unknown ref={name}") + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + import ray + + return ray.data.from_arrow(self.to_arrow()) + + +class BaseIncrementalScan(TableScan): + """Base class for incremental scans. + + Args: + to_snapshot_id: The end snapshot ID (inclusive). + from_snapshot_id_exclusive: The start snapshot ID (exclusive). + """ + + to_snapshot_id: Optional[int] + from_snapshot_id_exclusive: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + to_snapshot_id: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.to_snapshot_id = to_snapshot_id + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + + def to_snapshot(self: S, to_snapshot_id: int) -> S: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). + + If the end snapshot is not configured, it defaults to the current table snapshot (inclusive). + + Args: + to_snapshot_id: the end snapshot ID (inclusive) + + Returns: + this for method chaining + """ + return self.update(to_snapshot_id=to_snapshot_id) Review Comment: When the parameter "to_snapshot_id" is not set, we assume that the user intends to get the latest data in the table, so I will fetch the latest snapshot_id of the table. ``` if self.to_snapshot_id is None: current_snapshot = self.table_metadata.current_snapshot() if current_snapshot is None: raise ValueError("End snapshot is not set and table has no current snapshot") self.to_snapshot_id = current_snapshot.snapshot_id ``` Currently, this is being done in the `plan_files()`, but we can also move it forward to `__init__` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org