smaheshwar-pltr commented on code in PR #3364:
URL: https://github.com/apache/iceberg-python/pull/3364#discussion_r3260051114


##########
pyiceberg/table/__init__.py:
##########
@@ -2103,116 +2189,346 @@ def plan_files(self) -> Iterable[FileScanTask]:
             return self._plan_files_server_side()
         return self._plan_files_local()
 
-    def to_arrow(self) -> pa.Table:
-        """Read an Arrow table eagerly from this DataScan.
+    def count(self) -> int:
+        from pyiceberg.io.pyarrow import ArrowScan
 
-        All rows will be loaded into memory at once.
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
 
-        Returns:
-            pa.Table: Materialized Arrow Table from the Iceberg table's 
DataScan
-        """
-        from pyiceberg.io.pyarrow import ArrowScan
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:
+                arrow_scan = ArrowScan(
+                    table_metadata=self.table_metadata,
+                    io=self.io,
+                    projected_schema=self.projection(),
+                    row_filter=self.row_filter,
+                    case_sensitive=self.case_sensitive,
+                )
+                tbl = arrow_scan.to_table([task])
+                res += len(tbl)
+        return res
 
-        return ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_table(self.plan_files())
 
-    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
-        """Return an Arrow RecordBatchReader from this DataScan.
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
 
-        For large results, using a RecordBatchReader requires less memory than
-        loading an Arrow Table for the same DataScan, because a RecordBatch
-        is read one at a time.
 
-        Returns:
-            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg 
table's DataScan
-                which can be used to read a stream of record batches one by 
one.
-        """
-        import pyarrow as pa
+class IncrementalAppendScan(BaseScan):
+    """An incremental scan of a table's data that accumulates appended data 
between two snapshots.
 
-        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+    Args:
+        row_filter:
+            A string or BooleanExpression that describes the
+            desired rows
+        selected_fields:
+            A tuple of strings representing the column names
+            to return in the output dataframe.
+        case_sensitive:
+            If True column matching is case sensitive
+        options:
+            Additional Table properties as a dictionary of
+            string key value pairs to use for this scan.
+        limit:
+            An integer representing the number of rows to
+            return in the scan result. If None, fetches all
+            matching rows.
+        from_snapshot_id_exclusive:
+            Optional ID of the "from" snapshot, to start the incremental scan 
from, exclusively. When the scan is
+            ultimately planned, this must not be None. The snapshot does not 
need to be present in the table metadata
+            (it may have been expired), as long as it is the parent of some 
ancestor of the "to" snapshot.
+        to_snapshot_id_inclusive:
+            Optional ID of the "to" snapshot, to end the incremental scan at, 
inclusively.
+            Omitting it will default to the table's current snapshot.
+    """
 
-        target_schema = schema_to_pyarrow(self.projection())
-        batches = ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_record_batches(self.plan_files())
+    from_snapshot_id_exclusive: int | None
+    to_snapshot_id_inclusive: int | None
 
-        return pa.RecordBatchReader.from_batches(
-            target_schema,
-            batches,
-        ).cast(target_schema)
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        row_filter: str | BooleanExpression = ALWAYS_TRUE,
+        selected_fields: tuple[str, ...] = ("*",),
+        case_sensitive: bool = True,
+        options: Properties = EMPTY_DICT,
+        limit: int | None = None,
+        from_snapshot_id_exclusive: int | None = None,
+        to_snapshot_id_inclusive: int | None = None,
+    ):
+        super().__init__(
+            table_metadata=table_metadata,
+            io=io,
+            row_filter=row_filter,
+            selected_fields=selected_fields,
+            case_sensitive=case_sensitive,
+            options=options,
+            limit=limit,
+        )
+        self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+        self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
 
-    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
-        """Read a Pandas DataFrame eagerly from this Iceberg table.
+    def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int | 
None) -> IAS:
+        """Instructs this scan to look for changes starting from a particular 
snapshot (exclusive).
+
+        Args:
+            from_snapshot_id_exclusive: the start snapshot ID (exclusive)
 
         Returns:
-            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
+            this for method chaining
         """
-        return self.to_arrow().to_pandas(**kwargs)
+        return 
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
 
-    def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None 
= None) -> DuckDBPyConnection:
-        """Shorthand for loading the Iceberg Table in DuckDB.
+    def to_snapshot_inclusive(self: IAS, to_snapshot_id_inclusive: int | None) 
-> IAS:
+        """Instructs this scan to look for changes up to a particular snapshot 
(inclusive).
+
+        Args:
+            to_snapshot_id_inclusive: the end snapshot ID (inclusive)
 
         Returns:
-            DuckDBPyConnection: In memory DuckDB connection with the Iceberg 
table.
+            this for method chaining
         """
-        import duckdb
+        return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
 
-        con = connection or duckdb.connect(database=":memory:")
-        con.register(table_name, self.to_arrow())
+    def projection(self) -> Schema:
+        current_schema = self.table_metadata.schema()
+        if "*" in self.selected_fields:
+            return current_schema
+        return current_schema.select(*self.selected_fields, 
case_sensitive=self.case_sensitive)
 
-        return con
+    def plan_files(self) -> Iterable[FileScanTask]:
+        """Plans the relevant files added between the specified snapshots."""
+        from_snapshot_id, to_snapshot_id = 
self._validate_and_resolve_snapshots()
+
+        append_snapshots = [
+            snapshot
+            for snapshot in ancestors_between_ids(
+                from_snapshot_id_exclusive=from_snapshot_id,
+                to_snapshot_id_inclusive=to_snapshot_id,
+                table_metadata=self.table_metadata,
+            )
+            if snapshot.summary is not None and snapshot.summary.operation == 
Operation.APPEND
+        ]
+        if len(append_snapshots) == 0:
+            return iter([])
 
-    def to_ray(self) -> ray.data.dataset.Dataset:
-        """Read a Ray Dataset eagerly from this Iceberg table.
+        append_snapshot_ids = {snapshot.snapshot_id for snapshot in 
append_snapshots}
 
-        Returns:
-            ray.data.dataset.Dataset: Materialized Ray Dataset from the 
Iceberg table
-        """
-        import ray
+        manifests = list(
+            {
+                manifest_file
+                for snapshot in append_snapshots
+                for manifest_file in snapshot.manifests(self.io)
+                if manifest_file.content == ManifestContent.DATA and 
manifest_file.added_snapshot_id in append_snapshot_ids
+            }
+        )
 
-        return ray.data.from_arrow(self.to_arrow())
+        return ManifestGroupPlanner(
+            table_metadata=self.table_metadata,
+            io=self.io,
+            row_filter=self.row_filter,
+            case_sensitive=self.case_sensitive,
+            options=self.options,
+        ).plan_files(
+            manifests=manifests,
+            manifest_entry_filter=lambda manifest_entry: 
manifest_entry.snapshot_id in append_snapshot_ids
+            and manifest_entry.status == ManifestEntryStatus.ADDED,
+        )
 
-    def to_polars(self) -> pl.DataFrame:
-        """Read a Polars DataFrame from this Iceberg table.
+    def _validate_and_resolve_snapshots(self) -> tuple[int, int]:

Review Comment:
   Two semantic notes:
   
   1. `from` (exclusive) is validated via `is_parent_ancestor_of`, not 
`is_ancestor_of` — matches Java's 
[`BaseIncrementalScan.fromSnapshotIdExclusive`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java#L177-L185)
 (see the inline comment there about expiry) and C++'s 
[`internal::FromSnapshotIdExclusive`](https://github.com/apache/iceberg-cpp/blob/fc80e4bdbafcd659e4b44fb9fb8ae7960a08c2d1/src/iceberg/table_scan.cc#L249-L259).
 This admits cursors whose `from` snapshot has since been expired (canonical 
incremental-ingestion pattern); fabricated IDs are still rejected.
   2. Equal `from`/`to` raises (a snapshot is never its own parent ancestor), 
again matching Java/C++.



##########
pyiceberg/table/__init__.py:
##########
@@ -2103,116 +2189,346 @@ def plan_files(self) -> Iterable[FileScanTask]:
             return self._plan_files_server_side()
         return self._plan_files_local()
 
-    def to_arrow(self) -> pa.Table:
-        """Read an Arrow table eagerly from this DataScan.
+    def count(self) -> int:
+        from pyiceberg.io.pyarrow import ArrowScan
 
-        All rows will be loaded into memory at once.
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
 
-        Returns:
-            pa.Table: Materialized Arrow Table from the Iceberg table's 
DataScan
-        """
-        from pyiceberg.io.pyarrow import ArrowScan
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:
+                arrow_scan = ArrowScan(
+                    table_metadata=self.table_metadata,
+                    io=self.io,
+                    projected_schema=self.projection(),
+                    row_filter=self.row_filter,
+                    case_sensitive=self.case_sensitive,
+                )
+                tbl = arrow_scan.to_table([task])
+                res += len(tbl)
+        return res
 
-        return ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_table(self.plan_files())
 
-    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
-        """Return an Arrow RecordBatchReader from this DataScan.
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
 
-        For large results, using a RecordBatchReader requires less memory than
-        loading an Arrow Table for the same DataScan, because a RecordBatch
-        is read one at a time.
 
-        Returns:
-            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg 
table's DataScan
-                which can be used to read a stream of record batches one by 
one.
-        """
-        import pyarrow as pa
+class IncrementalAppendScan(BaseScan):
+    """An incremental scan of a table's data that accumulates appended data 
between two snapshots.
 
-        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+    Args:
+        row_filter:
+            A string or BooleanExpression that describes the
+            desired rows
+        selected_fields:
+            A tuple of strings representing the column names
+            to return in the output dataframe.
+        case_sensitive:
+            If True column matching is case sensitive
+        options:
+            Additional Table properties as a dictionary of
+            string key value pairs to use for this scan.
+        limit:
+            An integer representing the number of rows to
+            return in the scan result. If None, fetches all
+            matching rows.
+        from_snapshot_id_exclusive:
+            Optional ID of the "from" snapshot, to start the incremental scan 
from, exclusively. When the scan is
+            ultimately planned, this must not be None. The snapshot does not 
need to be present in the table metadata
+            (it may have been expired), as long as it is the parent of some 
ancestor of the "to" snapshot.
+        to_snapshot_id_inclusive:
+            Optional ID of the "to" snapshot, to end the incremental scan at, 
inclusively.
+            Omitting it will default to the table's current snapshot.
+    """
 
-        target_schema = schema_to_pyarrow(self.projection())
-        batches = ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_record_batches(self.plan_files())
+    from_snapshot_id_exclusive: int | None
+    to_snapshot_id_inclusive: int | None
 
-        return pa.RecordBatchReader.from_batches(
-            target_schema,
-            batches,
-        ).cast(target_schema)
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        row_filter: str | BooleanExpression = ALWAYS_TRUE,
+        selected_fields: tuple[str, ...] = ("*",),
+        case_sensitive: bool = True,
+        options: Properties = EMPTY_DICT,
+        limit: int | None = None,
+        from_snapshot_id_exclusive: int | None = None,
+        to_snapshot_id_inclusive: int | None = None,
+    ):
+        super().__init__(
+            table_metadata=table_metadata,
+            io=io,
+            row_filter=row_filter,
+            selected_fields=selected_fields,
+            case_sensitive=case_sensitive,
+            options=options,
+            limit=limit,
+        )
+        self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+        self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
 
-    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
-        """Read a Pandas DataFrame eagerly from this Iceberg table.
+    def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int | 
None) -> IAS:
+        """Instructs this scan to look for changes starting from a particular 
snapshot (exclusive).
+
+        Args:
+            from_snapshot_id_exclusive: the start snapshot ID (exclusive)
 
         Returns:
-            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
+            this for method chaining
         """
-        return self.to_arrow().to_pandas(**kwargs)
+        return 
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
 
-    def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None 
= None) -> DuckDBPyConnection:
-        """Shorthand for loading the Iceberg Table in DuckDB.
+    def to_snapshot_inclusive(self: IAS, to_snapshot_id_inclusive: int | None) 
-> IAS:
+        """Instructs this scan to look for changes up to a particular snapshot 
(inclusive).
+
+        Args:
+            to_snapshot_id_inclusive: the end snapshot ID (inclusive)
 
         Returns:
-            DuckDBPyConnection: In memory DuckDB connection with the Iceberg 
table.
+            this for method chaining
         """
-        import duckdb
+        return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
 
-        con = connection or duckdb.connect(database=":memory:")
-        con.register(table_name, self.to_arrow())
+    def projection(self) -> Schema:
+        current_schema = self.table_metadata.schema()
+        if "*" in self.selected_fields:
+            return current_schema
+        return current_schema.select(*self.selected_fields, 
case_sensitive=self.case_sensitive)
 
-        return con
+    def plan_files(self) -> Iterable[FileScanTask]:
+        """Plans the relevant files added between the specified snapshots."""
+        from_snapshot_id, to_snapshot_id = 
self._validate_and_resolve_snapshots()
+
+        append_snapshots = [
+            snapshot
+            for snapshot in ancestors_between_ids(
+                from_snapshot_id_exclusive=from_snapshot_id,
+                to_snapshot_id_inclusive=to_snapshot_id,
+                table_metadata=self.table_metadata,
+            )
+            if snapshot.summary is not None and snapshot.summary.operation == 
Operation.APPEND
+        ]
+        if len(append_snapshots) == 0:
+            return iter([])
 
-    def to_ray(self) -> ray.data.dataset.Dataset:
-        """Read a Ray Dataset eagerly from this Iceberg table.
+        append_snapshot_ids = {snapshot.snapshot_id for snapshot in 
append_snapshots}
 
-        Returns:
-            ray.data.dataset.Dataset: Materialized Ray Dataset from the 
Iceberg table
-        """
-        import ray
+        manifests = list(
+            {
+                manifest_file
+                for snapshot in append_snapshots
+                for manifest_file in snapshot.manifests(self.io)
+                if manifest_file.content == ManifestContent.DATA and 
manifest_file.added_snapshot_id in append_snapshot_ids
+            }
+        )
 
-        return ray.data.from_arrow(self.to_arrow())
+        return ManifestGroupPlanner(
+            table_metadata=self.table_metadata,
+            io=self.io,
+            row_filter=self.row_filter,
+            case_sensitive=self.case_sensitive,
+            options=self.options,
+        ).plan_files(
+            manifests=manifests,
+            manifest_entry_filter=lambda manifest_entry: 
manifest_entry.snapshot_id in append_snapshot_ids
+            and manifest_entry.status == ManifestEntryStatus.ADDED,
+        )
 
-    def to_polars(self) -> pl.DataFrame:
-        """Read a Polars DataFrame from this Iceberg table.
+    def _validate_and_resolve_snapshots(self) -> tuple[int, int]:
+        if self.from_snapshot_id_exclusive is None:
+            raise ValueError("Start snapshot is not set, please set 
from_snapshot_id_exclusive")
 
-        Returns:
-            pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
+        if self.to_snapshot_id_inclusive is None:
+            current_snapshot = self.table_metadata.current_snapshot()
+            if current_snapshot is None:
+                raise ValueError("End snapshot is not set and table has no 
current snapshot")
+            to_snapshot_id = current_snapshot.snapshot_id
+        else:
+            if 
self.table_metadata.snapshot_by_id(self.to_snapshot_id_inclusive) is None:
+                raise ValueError(f"End snapshot not found in table metadata: 
{self.to_snapshot_id_inclusive}")
+            to_snapshot_id = self.to_snapshot_id_inclusive
+
+        # The start snapshot is exclusive, so it does not need to be present 
in the table metadata
+        # (it may have been expired). It is valid as long as it is the parent 
of some ancestor of
+        # the end snapshot.
+        if not is_parent_ancestor_of(to_snapshot_id, 
self.from_snapshot_id_exclusive, self.table_metadata):
+            raise ValueError(
+                f"Starting snapshot (exclusive) 
{self.from_snapshot_id_exclusive} is not a parent "
+                f"ancestor of end snapshot {to_snapshot_id}"
+            )
+
+        return self.from_snapshot_id_exclusive, to_snapshot_id
+
+
+class ManifestGroupPlanner:

Review Comment:
   Motivated by Java's 
[`ManifestGroup`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/ManifestGroup.java)
 — both `DataScan` and `IncrementalAppendScan` need to plan file scan tasks 
from a set of manifests with optional filtering, and this is the natural shape 
for that ([prior 
thinking](https://github.com/apache/iceberg-python/pull/2031#discussion_r2102779860)).
 All the `_build_*` helpers and `_check_sequence_number` are **moved** from 
`DataScan`, not new.



##########
pyiceberg/table/__init__.py:
##########
@@ -2103,116 +2189,346 @@ def plan_files(self) -> Iterable[FileScanTask]:
             return self._plan_files_server_side()
         return self._plan_files_local()
 
-    def to_arrow(self) -> pa.Table:
-        """Read an Arrow table eagerly from this DataScan.
+    def count(self) -> int:
+        from pyiceberg.io.pyarrow import ArrowScan
 
-        All rows will be loaded into memory at once.
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
 
-        Returns:
-            pa.Table: Materialized Arrow Table from the Iceberg table's 
DataScan
-        """
-        from pyiceberg.io.pyarrow import ArrowScan
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:
+                arrow_scan = ArrowScan(
+                    table_metadata=self.table_metadata,
+                    io=self.io,
+                    projected_schema=self.projection(),
+                    row_filter=self.row_filter,
+                    case_sensitive=self.case_sensitive,
+                )
+                tbl = arrow_scan.to_table([task])
+                res += len(tbl)
+        return res
 
-        return ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_table(self.plan_files())
 
-    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
-        """Return an Arrow RecordBatchReader from this DataScan.
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
 
-        For large results, using a RecordBatchReader requires less memory than
-        loading an Arrow Table for the same DataScan, because a RecordBatch
-        is read one at a time.
 
-        Returns:
-            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg 
table's DataScan
-                which can be used to read a stream of record batches one by 
one.
-        """
-        import pyarrow as pa
+class IncrementalAppendScan(BaseScan):
+    """An incremental scan of a table's data that accumulates appended data 
between two snapshots.
 
-        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+    Args:
+        row_filter:
+            A string or BooleanExpression that describes the
+            desired rows
+        selected_fields:
+            A tuple of strings representing the column names
+            to return in the output dataframe.
+        case_sensitive:
+            If True column matching is case sensitive
+        options:
+            Additional Table properties as a dictionary of
+            string key value pairs to use for this scan.
+        limit:
+            An integer representing the number of rows to
+            return in the scan result. If None, fetches all
+            matching rows.
+        from_snapshot_id_exclusive:
+            Optional ID of the "from" snapshot, to start the incremental scan 
from, exclusively. When the scan is
+            ultimately planned, this must not be None. The snapshot does not 
need to be present in the table metadata
+            (it may have been expired), as long as it is the parent of some 
ancestor of the "to" snapshot.
+        to_snapshot_id_inclusive:
+            Optional ID of the "to" snapshot, to end the incremental scan at, 
inclusively.
+            Omitting it will default to the table's current snapshot.
+    """
 
-        target_schema = schema_to_pyarrow(self.projection())
-        batches = ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_record_batches(self.plan_files())
+    from_snapshot_id_exclusive: int | None
+    to_snapshot_id_inclusive: int | None
 
-        return pa.RecordBatchReader.from_batches(
-            target_schema,
-            batches,
-        ).cast(target_schema)
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        row_filter: str | BooleanExpression = ALWAYS_TRUE,
+        selected_fields: tuple[str, ...] = ("*",),
+        case_sensitive: bool = True,
+        options: Properties = EMPTY_DICT,
+        limit: int | None = None,
+        from_snapshot_id_exclusive: int | None = None,
+        to_snapshot_id_inclusive: int | None = None,
+    ):
+        super().__init__(
+            table_metadata=table_metadata,
+            io=io,
+            row_filter=row_filter,
+            selected_fields=selected_fields,
+            case_sensitive=case_sensitive,
+            options=options,
+            limit=limit,
+        )
+        self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+        self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
 
-    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
-        """Read a Pandas DataFrame eagerly from this Iceberg table.
+    def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int | 
None) -> IAS:
+        """Instructs this scan to look for changes starting from a particular 
snapshot (exclusive).
+
+        Args:
+            from_snapshot_id_exclusive: the start snapshot ID (exclusive)
 
         Returns:
-            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
+            this for method chaining
         """
-        return self.to_arrow().to_pandas(**kwargs)
+        return 
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
 
-    def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None 
= None) -> DuckDBPyConnection:
-        """Shorthand for loading the Iceberg Table in DuckDB.
+    def to_snapshot_inclusive(self: IAS, to_snapshot_id_inclusive: int | None) 
-> IAS:
+        """Instructs this scan to look for changes up to a particular snapshot 
(inclusive).
+
+        Args:
+            to_snapshot_id_inclusive: the end snapshot ID (inclusive)
 
         Returns:
-            DuckDBPyConnection: In memory DuckDB connection with the Iceberg 
table.
+            this for method chaining
         """
-        import duckdb
+        return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
 
-        con = connection or duckdb.connect(database=":memory:")
-        con.register(table_name, self.to_arrow())
+    def projection(self) -> Schema:
+        current_schema = self.table_metadata.schema()
+        if "*" in self.selected_fields:
+            return current_schema
+        return current_schema.select(*self.selected_fields, 
case_sensitive=self.case_sensitive)
 
-        return con
+    def plan_files(self) -> Iterable[FileScanTask]:
+        """Plans the relevant files added between the specified snapshots."""
+        from_snapshot_id, to_snapshot_id = 
self._validate_and_resolve_snapshots()
+
+        append_snapshots = [
+            snapshot
+            for snapshot in ancestors_between_ids(
+                from_snapshot_id_exclusive=from_snapshot_id,
+                to_snapshot_id_inclusive=to_snapshot_id,
+                table_metadata=self.table_metadata,
+            )
+            if snapshot.summary is not None and snapshot.summary.operation == 
Operation.APPEND
+        ]
+        if len(append_snapshots) == 0:
+            return iter([])
 
-    def to_ray(self) -> ray.data.dataset.Dataset:
-        """Read a Ray Dataset eagerly from this Iceberg table.
+        append_snapshot_ids = {snapshot.snapshot_id for snapshot in 
append_snapshots}
 
-        Returns:
-            ray.data.dataset.Dataset: Materialized Ray Dataset from the 
Iceberg table
-        """
-        import ray
+        manifests = list(
+            {
+                manifest_file
+                for snapshot in append_snapshots
+                for manifest_file in snapshot.manifests(self.io)
+                if manifest_file.content == ManifestContent.DATA and 
manifest_file.added_snapshot_id in append_snapshot_ids
+            }
+        )
 
-        return ray.data.from_arrow(self.to_arrow())
+        return ManifestGroupPlanner(
+            table_metadata=self.table_metadata,
+            io=self.io,
+            row_filter=self.row_filter,
+            case_sensitive=self.case_sensitive,
+            options=self.options,
+        ).plan_files(
+            manifests=manifests,
+            manifest_entry_filter=lambda manifest_entry: 
manifest_entry.snapshot_id in append_snapshot_ids
+            and manifest_entry.status == ManifestEntryStatus.ADDED,
+        )
 
-    def to_polars(self) -> pl.DataFrame:
-        """Read a Polars DataFrame from this Iceberg table.
+    def _validate_and_resolve_snapshots(self) -> tuple[int, int]:
+        if self.from_snapshot_id_exclusive is None:
+            raise ValueError("Start snapshot is not set, please set 
from_snapshot_id_exclusive")
 
-        Returns:
-            pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
+        if self.to_snapshot_id_inclusive is None:
+            current_snapshot = self.table_metadata.current_snapshot()
+            if current_snapshot is None:
+                raise ValueError("End snapshot is not set and table has no 
current snapshot")
+            to_snapshot_id = current_snapshot.snapshot_id
+        else:
+            if 
self.table_metadata.snapshot_by_id(self.to_snapshot_id_inclusive) is None:
+                raise ValueError(f"End snapshot not found in table metadata: 
{self.to_snapshot_id_inclusive}")
+            to_snapshot_id = self.to_snapshot_id_inclusive
+
+        # The start snapshot is exclusive, so it does not need to be present 
in the table metadata
+        # (it may have been expired). It is valid as long as it is the parent 
of some ancestor of
+        # the end snapshot.
+        if not is_parent_ancestor_of(to_snapshot_id, 
self.from_snapshot_id_exclusive, self.table_metadata):
+            raise ValueError(
+                f"Starting snapshot (exclusive) 
{self.from_snapshot_id_exclusive} is not a parent "
+                f"ancestor of end snapshot {to_snapshot_id}"
+            )
+
+        return self.from_snapshot_id_exclusive, to_snapshot_id
+
+
+class ManifestGroupPlanner:
+    """Plans the scan tasks for a group of manifests."""
+
+    table_metadata: TableMetadata
+    io: FileIO
+    row_filter: BooleanExpression
+    case_sensitive: bool
+    options: Properties
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        row_filter: str | BooleanExpression = ALWAYS_TRUE,
+        case_sensitive: bool = True,
+        options: Properties = EMPTY_DICT,
+    ):
+        self.table_metadata = table_metadata
+        self.io = io
+        self.row_filter = _parse_row_filter(row_filter)
+        self.case_sensitive = case_sensitive
+        self.options = options
+
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
+
+    def plan_manifest_entries(self, manifests: Iterable[ManifestFile]) -> 
Iterator[list[ManifestEntry]]:
+        """Filter the given manifests using partition summaries and read the 
matching manifest entries.
+
+        For each manifest that passes the partition-summary filter, returns a 
list of its
+        manifest entries that match the partition and metrics evaluators. The 
returned iterator
+        yields one list per manifest (in parallel).
         """
-        import polars as pl
+        # step 1: filter manifests using partition summaries
+        # the filter depends on the partition spec used to write the manifest 
file, so create a cache of filters for each spec id
+        manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] = 
KeyDefaultDict(self._build_manifest_evaluator)
+        manifests = [
+            manifest_file for manifest_file in manifests if 
manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
+        ]
 
-        result = pl.from_arrow(self.to_arrow())
-        if isinstance(result, pl.Series):
-            result = result.to_frame()
+        # step 2: filter the data files in each manifest
+        # this filter depends on the partition spec used to write the manifest 
file
+        partition_evaluators: dict[int, Callable[[DataFile], bool]] = 
KeyDefaultDict(self._build_partition_evaluator)
+        min_sequence_number = _min_sequence_number(manifests)
 
-        return result
+        executor = ExecutorFactory.get_or_create()
+        return executor.map(
+            lambda args: _open_manifest(*args),

Review Comment:
   Extracted so both `DataScan.scan_plan_helper` (kept for back-compat / 
`inspect.py`) and `plan_files` below can share the partition-summary / per-file 
evaluator pipeline.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to