hililiwei commented on code in PR #533:
URL: https://github.com/apache/iceberg-python/pull/533#discussion_r1626799873


##########
pyiceberg/table/__init__.py:
##########
@@ -1754,6 +1788,134 @@ def to_arrow(self) -> pa.Table:
     def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
         return self.to_arrow().to_pandas(**kwargs)
 
+    def use_ref(self: S, name: str) -> S:
+        if self.snapshot_id:  # type: ignore
+            raise ValueError(f"Cannot override ref, already set snapshot 
id={self.snapshot_id}")  # type: ignore
+        if snapshot := self.table_metadata.snapshot_by_name(name):
+            return self.update(snapshot_id=snapshot.snapshot_id)
+
+        raise ValueError(f"Cannot scan unknown ref={name}")
+
+    def to_duckdb(self, table_name: str, connection: 
Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
+        import duckdb
+
+        con = connection or duckdb.connect(database=":memory:")
+        con.register(table_name, self.to_arrow())
+
+        return con
+
+    def to_ray(self) -> ray.data.dataset.Dataset:
+        import ray
+
+        return ray.data.from_arrow(self.to_arrow())
+
+
+class BaseIncrementalScan(TableScan):
+    """Base class for incremental scans.
+
+    Args:
+        to_snapshot_id: The end snapshot ID (inclusive).
+        from_snapshot_id_exclusive: The start snapshot ID (exclusive).
+    """
+
+    to_snapshot_id: Optional[int]
+    from_snapshot_id_exclusive: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
+        selected_fields: Tuple[str, ...] = ("*",),
+        case_sensitive: bool = True,
+        options: Properties = EMPTY_DICT,
+        limit: Optional[int] = None,
+        to_snapshot_id: Optional[int] = None,
+        from_snapshot_id_exclusive: Optional[int] = None,
+    ):
+        super().__init__(table_metadata, io, row_filter, selected_fields, 
case_sensitive, options, limit)
+        self.to_snapshot_id = to_snapshot_id
+        self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+
+    def to_snapshot(self: S, to_snapshot_id: int) -> S:
+        """Instructs this scan to look for changes up to a particular snapshot 
(inclusive).
+
+        If the end snapshot is not configured, it defaults to the current 
table snapshot (inclusive).
+
+        Args:
+            to_snapshot_id: the end snapshot ID (inclusive)
+
+        Returns:
+            this for method chaining
+        """
+        return self.update(to_snapshot_id=to_snapshot_id)

Review Comment:
   When the parameter "to_snapshot_id" is not set, we assume that the user 
intends to get the latest data in the table, so I will fetch the latest 
snapshot_id of the table.
   
   ```
           if self.to_snapshot_id is None:
               current_snapshot = self.table_metadata.current_snapshot()
               if current_snapshot is None:
                   raise ValueError("End snapshot is not set and table has no 
current snapshot")
               self.to_snapshot_id = current_snapshot.snapshot_id
   ```
   
   Currently, this is being done in the `plan_files()`, but we can also move it 
forward to `__init__`
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to