hililiwei commented on code in PR #533: URL: https://github.com/apache/iceberg-python/pull/533#discussion_r1626843860
########## pyiceberg/table/__init__.py: ########## @@ -1754,6 +1788,134 @@ def to_arrow(self) -> pa.Table: def to_pandas(self, **kwargs: Any) -> pd.DataFrame: return self.to_arrow().to_pandas(**kwargs) + def use_ref(self: S, name: str) -> S: + if self.snapshot_id: # type: ignore + raise ValueError(f"Cannot override ref, already set snapshot id={self.snapshot_id}") # type: ignore + if snapshot := self.table_metadata.snapshot_by_name(name): + return self.update(snapshot_id=snapshot.snapshot_id) + + raise ValueError(f"Cannot scan unknown ref={name}") + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + import ray + + return ray.data.from_arrow(self.to_arrow()) + + +class BaseIncrementalScan(TableScan): + """Base class for incremental scans. + + Args: + to_snapshot_id: The end snapshot ID (inclusive). + from_snapshot_id_exclusive: The start snapshot ID (exclusive). + """ + + to_snapshot_id: Optional[int] + from_snapshot_id_exclusive: Optional[int] + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + to_snapshot_id: Optional[int] = None, + from_snapshot_id_exclusive: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.to_snapshot_id = to_snapshot_id + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + + def to_snapshot(self: S, to_snapshot_id: int) -> S: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). + + If the end snapshot is not configured, it defaults to the current table snapshot (inclusive). + + Args: + to_snapshot_id: the end snapshot ID (inclusive) + + Returns: + this for method chaining + """ + return self.update(to_snapshot_id=to_snapshot_id) + + def from_snapshot_exclusive(self: S, from_snapshot_id: int) -> S: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). + + If the start snapshot is not configured, it defaults to the oldest ancestor of the end snapshot (inclusive). + + Args: + from_snapshot_id: the start snapshot ID (exclusive) + + Returns: + this for method chaining + """ + return self.update(from_snapshot_id_exclusive=from_snapshot_id) + + def use_ref(self: S, name: str) -> S: + raise NotImplementedError("Not implemented for IncrementalScan yet.") + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + @abstractmethod + def _do_plan_files(self, from_snapshot_id_exclusive: int, to_snapshot_id: int) -> Iterable[FileScanTask]: ... + + def plan_files(self) -> Iterable[FileScanTask]: Review Comment: Doneļ¼ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org