smaheshwar-pltr commented on code in PR #3512:
URL: https://github.com/apache/iceberg-python/pull/3512#discussion_r3417661033
##########
pyiceberg/table/__init__.py:
##########
@@ -1165,6 +1165,59 @@ def scan(
table_identifier=self._identifier,
)
+ def incremental_append_scan(
Review Comment:
[AI reviewer aid] New convenience method mirroring `Table.scan` ([naming
thought](https://github.com/apache/iceberg-python/pull/2031#discussion_r2102631306)).
Args mirror `scan` minus `snapshot_id` plus the two snapshot-range args.
##########
pyiceberg/table/__init__.py:
##########
@@ -1165,6 +1165,59 @@ def scan(
table_identifier=self._identifier,
)
+ def incremental_append_scan(
+ self,
+ row_filter: str | BooleanExpression = ALWAYS_TRUE,
+ selected_fields: tuple[str, ...] = ("*",),
+ case_sensitive: bool = True,
+ from_snapshot_id_exclusive: int | None = None,
Review Comment:
[AI reviewer aid] Requiring `from_snapshot_id_exclusive` to be non-`None` at
plan time is a deliberate divergence from Java's [`IncrementalScan`
semantics](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/api/src/main/java/org/apache/iceberg/IncrementalScan.java#L53-L54)
(where the start defaults to the oldest ancestor of the end snapshot when not
configured). Follows Spark's required `start-snapshot-id`
([docs](https://iceberg.apache.org/docs/latest/spark-queries/#incremental-read)).
Argument
[here](https://github.com/apache/iceberg-python/pull/2031#discussion_r2102674779)
— TL;DR an append scan only reads `append` snapshots, so "from the oldest
ancestor" would be misleading after a `replace`.
##########
pyiceberg/table/__init__.py:
##########
@@ -1668,6 +1721,18 @@ def scan(
) -> DataScan:
raise ValueError("Cannot scan a staged table")
+ def incremental_append_scan(
Review Comment:
[AI reviewer aid] Mirrors `StagedTable.scan` two lines up — staged tables
have no committed metadata to scan against.
##########
pyiceberg/table/__init__.py:
##########
@@ -2128,91 +2170,384 @@ def to_arrow_batch_reader(self) ->
pa.RecordBatchReader:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
which can be used to read a stream of record batches one by
one.
"""
- import pyarrow as pa
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
+
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
+
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
- from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
+ else:
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- target_schema = schema_to_pyarrow(self.projection())
- batches = ArrowScan(
- self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
- ).to_record_batches(self.plan_files())
- return pa.RecordBatchReader.from_batches(
- target_schema,
- batches,
- ).cast(target_schema)
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
- def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
- """Read a Pandas DataFrame eagerly from this Iceberg table.
+
+class IncrementalAppendScan(BaseScan):
Review Comment:
[AI reviewer aid] Mirrors Java's
[`IncrementalAppendScan`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java#L22)
interface and
[`BaseIncrementalAppendScan`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L31)
implementation. Only the append variant of `IncrementalScan` — changelog scan
is out of scope here.
##########
pyiceberg/table/__init__.py:
##########
@@ -2128,91 +2170,384 @@ def to_arrow_batch_reader(self) ->
pa.RecordBatchReader:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
which can be used to read a stream of record batches one by
one.
"""
- import pyarrow as pa
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
+
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
+
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
- from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
+ else:
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- target_schema = schema_to_pyarrow(self.projection())
- batches = ArrowScan(
- self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
- ).to_record_batches(self.plan_files())
- return pa.RecordBatchReader.from_batches(
- target_schema,
- batches,
- ).cast(target_schema)
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
- def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
- """Read a Pandas DataFrame eagerly from this Iceberg table.
+
+class IncrementalAppendScan(BaseScan):
+ """An incremental scan of a table's data that accumulates appended data
between two snapshots.
+
+ Args:
+ row_filter:
+ A string or BooleanExpression that describes the
+ desired rows
+ selected_fields:
+ A tuple of strings representing the column names
+ to return in the output dataframe.
+ case_sensitive:
+ If True column matching is case sensitive
+ options:
+ Additional Table properties as a dictionary of
+ string key value pairs to use for this scan.
+ limit:
+ An integer representing the number of rows to
+ return in the scan result. If None, fetches all
+ matching rows.
+ from_snapshot_id_exclusive:
+ Optional ID of the "from" snapshot, to start the incremental scan
from, exclusively. When the scan is
+ ultimately planned, this must not be None. The snapshot does not
need to be present in the table metadata
+ (it may have been expired), as long as it is the parent of some
ancestor of the "to" snapshot.
+ to_snapshot_id_inclusive:
+ Optional ID of the "to" snapshot, to end the incremental scan at,
inclusively.
+ Omitting it will default to the table's current snapshot.
+ """
+
+ from_snapshot_id_exclusive: int | None
+ to_snapshot_id_inclusive: int | None
+
+ def __init__(
+ self,
+ table_metadata: TableMetadata,
+ io: FileIO,
+ row_filter: str | BooleanExpression = ALWAYS_TRUE,
+ selected_fields: tuple[str, ...] = ("*",),
+ case_sensitive: bool = True,
+ options: Properties = EMPTY_DICT,
+ limit: int | None = None,
+ from_snapshot_id_exclusive: int | None = None,
+ to_snapshot_id_inclusive: int | None = None,
+ ):
+ super().__init__(
+ table_metadata=table_metadata,
+ io=io,
+ row_filter=row_filter,
+ selected_fields=selected_fields,
+ case_sensitive=case_sensitive,
+ options=options,
+ limit=limit,
+ )
+ self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+ self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
+
+ def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int |
None) -> IAS:
Review Comment:
[AI reviewer aid] Maps to Java's
[`fromSnapshotExclusive(long)`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/api/src/main/java/org/apache/iceberg/IncrementalScan.java#L61).
We don't expose the `String ref` overload or `useBranch` — Spark passes raw
IDs anyway, and ref support can be added later without breaking anything.
##########
pyiceberg/table/__init__.py:
##########
@@ -2128,91 +2170,384 @@ def to_arrow_batch_reader(self) ->
pa.RecordBatchReader:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
which can be used to read a stream of record batches one by
one.
"""
- import pyarrow as pa
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
+
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
+
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
- from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
+ else:
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- target_schema = schema_to_pyarrow(self.projection())
- batches = ArrowScan(
- self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
- ).to_record_batches(self.plan_files())
- return pa.RecordBatchReader.from_batches(
- target_schema,
- batches,
- ).cast(target_schema)
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
- def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
- """Read a Pandas DataFrame eagerly from this Iceberg table.
+
+class IncrementalAppendScan(BaseScan):
+ """An incremental scan of a table's data that accumulates appended data
between two snapshots.
+
+ Args:
+ row_filter:
+ A string or BooleanExpression that describes the
+ desired rows
+ selected_fields:
+ A tuple of strings representing the column names
+ to return in the output dataframe.
+ case_sensitive:
+ If True column matching is case sensitive
+ options:
+ Additional Table properties as a dictionary of
+ string key value pairs to use for this scan.
+ limit:
+ An integer representing the number of rows to
+ return in the scan result. If None, fetches all
+ matching rows.
+ from_snapshot_id_exclusive:
+ Optional ID of the "from" snapshot, to start the incremental scan
from, exclusively. When the scan is
+ ultimately planned, this must not be None. The snapshot does not
need to be present in the table metadata
+ (it may have been expired), as long as it is the parent of some
ancestor of the "to" snapshot.
+ to_snapshot_id_inclusive:
+ Optional ID of the "to" snapshot, to end the incremental scan at,
inclusively.
+ Omitting it will default to the table's current snapshot.
+ """
+
+ from_snapshot_id_exclusive: int | None
+ to_snapshot_id_inclusive: int | None
+
+ def __init__(
+ self,
+ table_metadata: TableMetadata,
+ io: FileIO,
+ row_filter: str | BooleanExpression = ALWAYS_TRUE,
+ selected_fields: tuple[str, ...] = ("*",),
+ case_sensitive: bool = True,
+ options: Properties = EMPTY_DICT,
+ limit: int | None = None,
+ from_snapshot_id_exclusive: int | None = None,
+ to_snapshot_id_inclusive: int | None = None,
+ ):
+ super().__init__(
+ table_metadata=table_metadata,
+ io=io,
+ row_filter=row_filter,
+ selected_fields=selected_fields,
+ case_sensitive=case_sensitive,
+ options=options,
+ limit=limit,
+ )
+ self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+ self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
+
+ def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int |
None) -> IAS:
+ """Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+
+ Args:
+ from_snapshot_id_exclusive: the start snapshot ID (exclusive)
Returns:
- pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
+ this for method chaining
"""
- return self.to_arrow().to_pandas(**kwargs)
+ return
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
- def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None
= None) -> DuckDBPyConnection:
- """Shorthand for loading the Iceberg Table in DuckDB.
+ def to_snapshot_inclusive(self: IAS, to_snapshot_id_inclusive: int | None)
-> IAS:
+ """Instructs this scan to look for changes up to a particular snapshot
(inclusive).
+
+ Args:
+ to_snapshot_id_inclusive: the end snapshot ID (inclusive)
Returns:
- DuckDBPyConnection: In memory DuckDB connection with the Iceberg
table.
+ this for method chaining
"""
- import duckdb
+ return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
- con = connection or duckdb.connect(database=":memory:")
- con.register(table_name, self.to_arrow())
+ def projection(self) -> Schema:
Review Comment:
[AI reviewer aid] Always uses the table's **current** schema, unlike
`TableScan.projection()` which uses the snapshot's schema when `snapshot_id` is
set. Matches Java:
[`BaseTable.newIncrementalAppendScan`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseTable.java#L89-L92)
constructs the scan with `schema()`, which on
[`BaseTable.schema()`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseTable.java#L104-L107)
returns `ops.current().schema()` — the table's current schema, not
snapshot-bound. C++ does the same:
[`TableScanBuilder::ResolveSnapshotSchema`](https://github.com/apache/iceberg-cpp/blob/fc80e4bdbafcd659e4b44fb9fb8ae7960a08c2d1/src/iceberg/table_scan.cc#L513-L526)
falls through to `metadata_->Schema()` for incremental scans (no `snapshot_id`
on the context). Older-schema rows in range get NULL for new columns — covered
by `
test_incremental_append_scan_schema_evolution_within_range`.
##########
pyiceberg/table/__init__.py:
##########
@@ -2128,91 +2170,384 @@ def to_arrow_batch_reader(self) ->
pa.RecordBatchReader:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
which can be used to read a stream of record batches one by
one.
"""
- import pyarrow as pa
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
+
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
+
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
- from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
+ else:
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- target_schema = schema_to_pyarrow(self.projection())
- batches = ArrowScan(
- self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
- ).to_record_batches(self.plan_files())
- return pa.RecordBatchReader.from_batches(
- target_schema,
- batches,
- ).cast(target_schema)
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
- def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
- """Read a Pandas DataFrame eagerly from this Iceberg table.
+
+class IncrementalAppendScan(BaseScan):
+ """An incremental scan of a table's data that accumulates appended data
between two snapshots.
+
+ Args:
+ row_filter:
+ A string or BooleanExpression that describes the
+ desired rows
+ selected_fields:
+ A tuple of strings representing the column names
+ to return in the output dataframe.
+ case_sensitive:
+ If True column matching is case sensitive
+ options:
+ Additional Table properties as a dictionary of
+ string key value pairs to use for this scan.
+ limit:
+ An integer representing the number of rows to
+ return in the scan result. If None, fetches all
+ matching rows.
+ from_snapshot_id_exclusive:
+ Optional ID of the "from" snapshot, to start the incremental scan
from, exclusively. When the scan is
+ ultimately planned, this must not be None. The snapshot does not
need to be present in the table metadata
+ (it may have been expired), as long as it is the parent of some
ancestor of the "to" snapshot.
+ to_snapshot_id_inclusive:
+ Optional ID of the "to" snapshot, to end the incremental scan at,
inclusively.
+ Omitting it will default to the table's current snapshot.
+ """
+
+ from_snapshot_id_exclusive: int | None
+ to_snapshot_id_inclusive: int | None
+
+ def __init__(
+ self,
+ table_metadata: TableMetadata,
+ io: FileIO,
+ row_filter: str | BooleanExpression = ALWAYS_TRUE,
+ selected_fields: tuple[str, ...] = ("*",),
+ case_sensitive: bool = True,
+ options: Properties = EMPTY_DICT,
+ limit: int | None = None,
+ from_snapshot_id_exclusive: int | None = None,
+ to_snapshot_id_inclusive: int | None = None,
+ ):
+ super().__init__(
+ table_metadata=table_metadata,
+ io=io,
+ row_filter=row_filter,
+ selected_fields=selected_fields,
+ case_sensitive=case_sensitive,
+ options=options,
+ limit=limit,
+ )
+ self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+ self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
+
+ def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int |
None) -> IAS:
+ """Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+
+ Args:
+ from_snapshot_id_exclusive: the start snapshot ID (exclusive)
Returns:
- pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
+ this for method chaining
"""
- return self.to_arrow().to_pandas(**kwargs)
+ return
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
- def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None
= None) -> DuckDBPyConnection:
- """Shorthand for loading the Iceberg Table in DuckDB.
+ def to_snapshot_inclusive(self: IAS, to_snapshot_id_inclusive: int | None)
-> IAS:
+ """Instructs this scan to look for changes up to a particular snapshot
(inclusive).
+
+ Args:
+ to_snapshot_id_inclusive: the end snapshot ID (inclusive)
Returns:
- DuckDBPyConnection: In memory DuckDB connection with the Iceberg
table.
+ this for method chaining
"""
- import duckdb
+ return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
- con = connection or duckdb.connect(database=":memory:")
- con.register(table_name, self.to_arrow())
+ def projection(self) -> Schema:
+ current_schema = self.table_metadata.schema()
+ if "*" in self.selected_fields:
+ return current_schema
+ return current_schema.select(*self.selected_fields,
case_sensitive=self.case_sensitive)
- return con
+ def plan_files(self) -> Iterable[FileScanTask]:
Review Comment:
[AI reviewer aid] Mirrors Java's
[`BaseIncrementalAppendScan.doPlanFiles`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L46-L57)
and
[`appendFilesFromSnapshots`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L68-L99)
— walk ancestors, filter to `append` snapshots, dedup manifests whose
`added_snapshot_id` is in range, then filter manifest entries by `(snapshot_id
in range, status == ADDED)`. Set semantics on the manifest dedup match the Java
[snippet](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L70-L74)
and rely on `ManifestFile.__eq__`/`__hash__` being defined (which they are on
`main` since #2233).
##########
pyiceberg/table/__init__.py:
##########
@@ -2128,91 +2170,384 @@ def to_arrow_batch_reader(self) ->
pa.RecordBatchReader:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
which can be used to read a stream of record batches one by
one.
"""
- import pyarrow as pa
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
+
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
+
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
- from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
+ else:
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- target_schema = schema_to_pyarrow(self.projection())
- batches = ArrowScan(
- self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
- ).to_record_batches(self.plan_files())
- return pa.RecordBatchReader.from_batches(
- target_schema,
- batches,
- ).cast(target_schema)
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
- def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
- """Read a Pandas DataFrame eagerly from this Iceberg table.
+
+class IncrementalAppendScan(BaseScan):
+ """An incremental scan of a table's data that accumulates appended data
between two snapshots.
+
+ Args:
+ row_filter:
+ A string or BooleanExpression that describes the
+ desired rows
+ selected_fields:
+ A tuple of strings representing the column names
+ to return in the output dataframe.
+ case_sensitive:
+ If True column matching is case sensitive
+ options:
+ Additional Table properties as a dictionary of
+ string key value pairs to use for this scan.
+ limit:
+ An integer representing the number of rows to
+ return in the scan result. If None, fetches all
+ matching rows.
+ from_snapshot_id_exclusive:
+ Optional ID of the "from" snapshot, to start the incremental scan
from, exclusively. When the scan is
+ ultimately planned, this must not be None. The snapshot does not
need to be present in the table metadata
+ (it may have been expired), as long as it is the parent of some
ancestor of the "to" snapshot.
+ to_snapshot_id_inclusive:
+ Optional ID of the "to" snapshot, to end the incremental scan at,
inclusively.
+ Omitting it will default to the table's current snapshot.
+ """
+
+ from_snapshot_id_exclusive: int | None
+ to_snapshot_id_inclusive: int | None
+
+ def __init__(
+ self,
+ table_metadata: TableMetadata,
+ io: FileIO,
+ row_filter: str | BooleanExpression = ALWAYS_TRUE,
+ selected_fields: tuple[str, ...] = ("*",),
+ case_sensitive: bool = True,
+ options: Properties = EMPTY_DICT,
+ limit: int | None = None,
+ from_snapshot_id_exclusive: int | None = None,
+ to_snapshot_id_inclusive: int | None = None,
+ ):
+ super().__init__(
+ table_metadata=table_metadata,
+ io=io,
+ row_filter=row_filter,
+ selected_fields=selected_fields,
+ case_sensitive=case_sensitive,
+ options=options,
+ limit=limit,
+ )
+ self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+ self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
+
+ def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int |
None) -> IAS:
+ """Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+
+ Args:
+ from_snapshot_id_exclusive: the start snapshot ID (exclusive)
Returns:
- pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
+ this for method chaining
"""
- return self.to_arrow().to_pandas(**kwargs)
+ return
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
- def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None
= None) -> DuckDBPyConnection:
- """Shorthand for loading the Iceberg Table in DuckDB.
+ def to_snapshot_inclusive(self: IAS, to_snapshot_id_inclusive: int | None)
-> IAS:
+ """Instructs this scan to look for changes up to a particular snapshot
(inclusive).
+
+ Args:
+ to_snapshot_id_inclusive: the end snapshot ID (inclusive)
Returns:
- DuckDBPyConnection: In memory DuckDB connection with the Iceberg
table.
+ this for method chaining
"""
- import duckdb
+ return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
- con = connection or duckdb.connect(database=":memory:")
- con.register(table_name, self.to_arrow())
+ def projection(self) -> Schema:
+ current_schema = self.table_metadata.schema()
+ if "*" in self.selected_fields:
+ return current_schema
+ return current_schema.select(*self.selected_fields,
case_sensitive=self.case_sensitive)
- return con
+ def plan_files(self) -> Iterable[FileScanTask]:
+ """Plans the relevant files added between the specified snapshots."""
+ from_snapshot_id, to_snapshot_id =
self._validate_and_resolve_snapshots()
+
+ append_snapshots = [
+ snapshot
+ for snapshot in ancestors_between_ids(
+ from_snapshot_id_exclusive=from_snapshot_id,
+ to_snapshot_id_inclusive=to_snapshot_id,
+ table_metadata=self.table_metadata,
+ )
+ if snapshot.summary is not None and snapshot.summary.operation ==
Operation.APPEND
+ ]
+ if len(append_snapshots) == 0:
+ return iter([])
- def to_ray(self) -> ray.data.dataset.Dataset:
- """Read a Ray Dataset eagerly from this Iceberg table.
+ append_snapshot_ids = {snapshot.snapshot_id for snapshot in
append_snapshots}
+
+ manifests = list(
+ {
+ manifest_file
+ for snapshot in append_snapshots
+ for manifest_file in snapshot.manifests(self.io)
+ if manifest_file.content == ManifestContent.DATA and
manifest_file.added_snapshot_id in append_snapshot_ids
+ }
+ )
+
+ return ManifestGroupPlanner(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ options=self.options,
+ ).plan_files(
+ manifests=manifests,
+ manifest_entry_filter=lambda manifest_entry:
manifest_entry.snapshot_id in append_snapshot_ids
+ and manifest_entry.status == ManifestEntryStatus.ADDED,
+ )
+
+ def to_arrow(self) -> pa.Table:
+ """Read an Arrow table eagerly from this IncrementalAppendScan.
+
+ All rows will be loaded into memory at once.
Returns:
- ray.data.dataset.Dataset: Materialized Ray Dataset from the
Iceberg table
+ pa.Table: Materialized Arrow Table from the Iceberg table's
IncrementalAppendScan
"""
- import ray
+ return _to_arrow_via_file_scan_tasks(self, self.plan_files())
- return ray.data.from_arrow(self.to_arrow())
+ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
+ """Return an Arrow RecordBatchReader from this IncrementalAppendScan.
- def to_polars(self) -> pl.DataFrame:
- """Read a Polars DataFrame from this Iceberg table.
+ For large results, using a RecordBatchReader requires less memory than
+ loading an Arrow Table for the same IncrementalAppendScan, because a
+ RecordBatch is read one at a time.
Returns:
- pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
+ pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's IncrementalAppendScan
+ which can be used to read a stream of record batches one by
one.
"""
- import polars as pl
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
- result = pl.from_arrow(self.to_arrow())
- if isinstance(result, pl.Series):
- result = result.to_frame()
+ def _validate_and_resolve_snapshots(self) -> tuple[int, int]:
Review Comment:
[AI reviewer aid] Two semantic notes:
1. `from` (exclusive) is validated via `is_parent_ancestor_of`, not
`is_ancestor_of` — matches Java's
[`BaseIncrementalScan.fromSnapshotIdExclusive`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseIncrementalScan.java#L177-L185)
(see the inline comment there about expiry) and C++'s
[`internal::FromSnapshotIdExclusive`](https://github.com/apache/iceberg-cpp/blob/fc80e4bdbafcd659e4b44fb9fb8ae7960a08c2d1/src/iceberg/table_scan.cc#L249-L259).
This admits cursors whose `from` snapshot has since been expired (canonical
incremental-ingestion pattern); fabricated IDs are still rejected.
2. Equal `from`/`to` raises (a snapshot is never its own parent ancestor),
again matching Java/C++.
##########
pyiceberg/table/__init__.py:
##########
@@ -2128,91 +2170,384 @@ def to_arrow_batch_reader(self) ->
pa.RecordBatchReader:
pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
which can be used to read a stream of record batches one by
one.
"""
- import pyarrow as pa
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
+
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
+
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
- from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
+ else:
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- target_schema = schema_to_pyarrow(self.projection())
- batches = ArrowScan(
- self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
- ).to_record_batches(self.plan_files())
- return pa.RecordBatchReader.from_batches(
- target_schema,
- batches,
- ).cast(target_schema)
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
- def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
- """Read a Pandas DataFrame eagerly from this Iceberg table.
+
+class IncrementalAppendScan(BaseScan):
+ """An incremental scan of a table's data that accumulates appended data
between two snapshots.
+
+ Args:
+ row_filter:
+ A string or BooleanExpression that describes the
+ desired rows
+ selected_fields:
+ A tuple of strings representing the column names
+ to return in the output dataframe.
+ case_sensitive:
+ If True column matching is case sensitive
+ options:
+ Additional Table properties as a dictionary of
+ string key value pairs to use for this scan.
+ limit:
+ An integer representing the number of rows to
+ return in the scan result. If None, fetches all
+ matching rows.
+ from_snapshot_id_exclusive:
+ Optional ID of the "from" snapshot, to start the incremental scan
from, exclusively. When the scan is
+ ultimately planned, this must not be None. The snapshot does not
need to be present in the table metadata
+ (it may have been expired), as long as it is the parent of some
ancestor of the "to" snapshot.
+ to_snapshot_id_inclusive:
+ Optional ID of the "to" snapshot, to end the incremental scan at,
inclusively.
+ Omitting it will default to the table's current snapshot.
+ """
+
+ from_snapshot_id_exclusive: int | None
+ to_snapshot_id_inclusive: int | None
+
+ def __init__(
+ self,
+ table_metadata: TableMetadata,
+ io: FileIO,
+ row_filter: str | BooleanExpression = ALWAYS_TRUE,
+ selected_fields: tuple[str, ...] = ("*",),
+ case_sensitive: bool = True,
+ options: Properties = EMPTY_DICT,
+ limit: int | None = None,
+ from_snapshot_id_exclusive: int | None = None,
+ to_snapshot_id_inclusive: int | None = None,
+ ):
+ super().__init__(
+ table_metadata=table_metadata,
+ io=io,
+ row_filter=row_filter,
+ selected_fields=selected_fields,
+ case_sensitive=case_sensitive,
+ options=options,
+ limit=limit,
+ )
+ self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+ self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
+
+ def from_snapshot_exclusive(self: IAS, from_snapshot_id_exclusive: int |
None) -> IAS:
+ """Instructs this scan to look for changes starting from a particular
snapshot (exclusive).
+
+ Args:
+ from_snapshot_id_exclusive: the start snapshot ID (exclusive)
Returns:
- pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
+ this for method chaining
"""
- return self.to_arrow().to_pandas(**kwargs)
+ return
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
- def to_duckdb(self, table_name: str, connection: DuckDBPyConnection | None
= None) -> DuckDBPyConnection:
- """Shorthand for loading the Iceberg Table in DuckDB.
+ def to_snapshot_inclusive(self: IAS, to_snapshot_id_inclusive: int | None)
-> IAS:
+ """Instructs this scan to look for changes up to a particular snapshot
(inclusive).
+
+ Args:
+ to_snapshot_id_inclusive: the end snapshot ID (inclusive)
Returns:
- DuckDBPyConnection: In memory DuckDB connection with the Iceberg
table.
+ this for method chaining
"""
- import duckdb
+ return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
- con = connection or duckdb.connect(database=":memory:")
- con.register(table_name, self.to_arrow())
+ def projection(self) -> Schema:
+ current_schema = self.table_metadata.schema()
+ if "*" in self.selected_fields:
+ return current_schema
+ return current_schema.select(*self.selected_fields,
case_sensitive=self.case_sensitive)
- return con
+ def plan_files(self) -> Iterable[FileScanTask]:
+ """Plans the relevant files added between the specified snapshots."""
+ from_snapshot_id, to_snapshot_id =
self._validate_and_resolve_snapshots()
+
+ append_snapshots = [
+ snapshot
+ for snapshot in ancestors_between_ids(
+ from_snapshot_id_exclusive=from_snapshot_id,
+ to_snapshot_id_inclusive=to_snapshot_id,
+ table_metadata=self.table_metadata,
+ )
+ if snapshot.summary is not None and snapshot.summary.operation ==
Operation.APPEND
+ ]
+ if len(append_snapshots) == 0:
+ return iter([])
- def to_ray(self) -> ray.data.dataset.Dataset:
- """Read a Ray Dataset eagerly from this Iceberg table.
+ append_snapshot_ids = {snapshot.snapshot_id for snapshot in
append_snapshots}
+
+ manifests = list(
+ {
+ manifest_file
+ for snapshot in append_snapshots
+ for manifest_file in snapshot.manifests(self.io)
+ if manifest_file.content == ManifestContent.DATA and
manifest_file.added_snapshot_id in append_snapshot_ids
+ }
+ )
+
+ return ManifestGroupPlanner(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ options=self.options,
+ ).plan_files(
+ manifests=manifests,
+ manifest_entry_filter=lambda manifest_entry:
manifest_entry.snapshot_id in append_snapshot_ids
+ and manifest_entry.status == ManifestEntryStatus.ADDED,
+ )
+
+ def to_arrow(self) -> pa.Table:
+ """Read an Arrow table eagerly from this IncrementalAppendScan.
+
+ All rows will be loaded into memory at once.
Returns:
- ray.data.dataset.Dataset: Materialized Ray Dataset from the
Iceberg table
+ pa.Table: Materialized Arrow Table from the Iceberg table's
IncrementalAppendScan
"""
- import ray
+ return _to_arrow_via_file_scan_tasks(self, self.plan_files())
- return ray.data.from_arrow(self.to_arrow())
+ def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
+ """Return an Arrow RecordBatchReader from this IncrementalAppendScan.
- def to_polars(self) -> pl.DataFrame:
- """Read a Polars DataFrame from this Iceberg table.
+ For large results, using a RecordBatchReader requires less memory than
+ loading an Arrow Table for the same IncrementalAppendScan, because a
+ RecordBatch is read one at a time.
Returns:
- pl.DataFrame: Materialized Polars Dataframe from the Iceberg table
+ pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's IncrementalAppendScan
+ which can be used to read a stream of record batches one by
one.
"""
- import polars as pl
+ return _to_arrow_batch_reader_via_file_scan_tasks(self,
self.plan_files())
- result = pl.from_arrow(self.to_arrow())
- if isinstance(result, pl.Series):
- result = result.to_frame()
+ def _validate_and_resolve_snapshots(self) -> tuple[int, int]:
+ if self.from_snapshot_id_exclusive is None:
+ raise ValueError("Start snapshot is not set, please set
from_snapshot_id_exclusive")
- return result
+ if self.to_snapshot_id_inclusive is None:
+ current_snapshot = self.table_metadata.current_snapshot()
+ if current_snapshot is None:
+ raise ValueError("End snapshot is not set and table has no
current snapshot")
+ to_snapshot_id = current_snapshot.snapshot_id
+ else:
+ if
self.table_metadata.snapshot_by_id(self.to_snapshot_id_inclusive) is None:
+ raise ValueError(f"End snapshot not found in table metadata:
{self.to_snapshot_id_inclusive}")
+ to_snapshot_id = self.to_snapshot_id_inclusive
+
+ # The start snapshot is exclusive, so it does not need to be present
in the table metadata
+ # (it may have been expired). It is valid as long as it is the parent
of some ancestor of
+ # the end snapshot.
+ if not is_parent_ancestor_of(to_snapshot_id,
self.from_snapshot_id_exclusive, self.table_metadata):
+ raise ValueError(
+ f"Starting snapshot (exclusive)
{self.from_snapshot_id_exclusive} is not a parent "
+ f"ancestor of end snapshot {to_snapshot_id}"
+ )
- def count(self) -> int:
- from pyiceberg.io.pyarrow import ArrowScan
+ return self.from_snapshot_id_exclusive, to_snapshot_id
- # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
- res = 0
- # every task is a FileScanTask
- tasks = self.plan_files()
- for task in tasks:
- # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
- # partition value and task.delete_files represents that positional
delete haven't been merged yet
- # hence those files have to read as a pyarrow table applying the
filter and deletes
- if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
- # Every File has a metadata stat that stores the file record
count
- res += task.file.record_count
- else:
- arrow_scan = ArrowScan(
- table_metadata=self.table_metadata,
- io=self.io,
- projected_schema=self.projection(),
- row_filter=self.row_filter,
- case_sensitive=self.case_sensitive,
+class ManifestGroupPlanner:
+ """Plans the scan tasks for a group of manifests."""
+
+ table_metadata: TableMetadata
+ io: FileIO
+ row_filter: BooleanExpression
+ case_sensitive: bool
+ options: Properties
+
+ def __init__(
+ self,
+ table_metadata: TableMetadata,
+ io: FileIO,
+ row_filter: str | BooleanExpression = ALWAYS_TRUE,
+ case_sensitive: bool = True,
+ options: Properties = EMPTY_DICT,
+ ):
+ self.table_metadata = table_metadata
+ self.io = io
+ self.row_filter = _parse_row_filter(row_filter)
+ self.case_sensitive = case_sensitive
+ self.options = options
+
+ @cached_property
+ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+ return KeyDefaultDict(self._build_partition_projection)
+
+ def plan_manifest_entries(self, manifests: Iterable[ManifestFile]) ->
Iterator[list[ManifestEntry]]:
+ """Filter the given manifests using partition summaries and read the
matching manifest entries.
+
+ For each manifest that passes the partition-summary filter, returns a
list of its
+ manifest entries that match the partition and metrics evaluators. The
returned iterator
+ yields one list per manifest (in parallel).
+ """
+ # step 1: filter manifests using partition summaries
+ # the filter depends on the partition spec used to write the manifest
file, so create a cache of filters for each spec id
+ manifest_evaluators: dict[int, Callable[[ManifestFile], bool]] =
KeyDefaultDict(self._build_manifest_evaluator)
+ manifests = [
+ manifest_file for manifest_file in manifests if
manifest_evaluators[manifest_file.partition_spec_id](manifest_file)
+ ]
+
+ # step 2: filter the data files in each manifest
+ # this filter depends on the partition spec used to write the manifest
file
+ partition_evaluators: dict[int, Callable[[DataFile], bool]] =
KeyDefaultDict(self._build_partition_evaluator)
+ min_sequence_number = _min_sequence_number(manifests)
+
+ executor = ExecutorFactory.get_or_create()
+ return executor.map(
+ lambda args: _open_manifest(*args),
+ [
+ (
+ self.io,
+ manifest,
+ partition_evaluators[manifest.partition_spec_id],
+ self._build_metrics_evaluator(),
)
- tbl = arrow_scan.to_table([task])
- res += len(tbl)
- return res
+ for manifest in manifests
+ if self._check_sequence_number(min_sequence_number, manifest)
+ ],
+ )
+
+ def plan_files(
+ self,
+ manifests: Iterable[ManifestFile],
+ manifest_entry_filter: Callable[[ManifestEntry], bool] = lambda _:
True,
Review Comment:
[AI reviewer aid] This manifest filter is new. Introducing that for append
scan logic where some manifests are skipped
##########
pyiceberg/table/snapshots.py:
##########
@@ -431,6 +431,46 @@ def ancestors_between(from_snapshot: Snapshot | None,
to_snapshot: Snapshot, tab
yield from ancestors_of(to_snapshot, table_metadata)
+def ancestors_between_ids(
Review Comment:
[AI reviewer aid] Mirrors Java's
[`SnapshotUtil.ancestorsBetween`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L216-L229).
Differs from the existing `ancestors_between` (snapshot-based,
inclusive-inclusive) above by taking IDs and being exclusive-inclusive, to
match the incremental-scan validation pattern. Raises if
`to_snapshot_id_inclusive` is missing from metadata, mirroring Java.
##########
pyiceberg/table/snapshots.py:
##########
@@ -431,6 +431,46 @@ def ancestors_between(from_snapshot: Snapshot | None,
to_snapshot: Snapshot, tab
yield from ancestors_of(to_snapshot, table_metadata)
+def ancestors_between_ids(
+ from_snapshot_id_exclusive: int | None,
+ to_snapshot_id_inclusive: int,
+ table_metadata: TableMetadata,
+) -> Iterable[Snapshot]:
+ """Get the ancestors of and including the given "to" snapshot, up to but
not including the "from" snapshot.
+
+ If ``from_snapshot_id_exclusive`` is None, all ancestors of the "to"
snapshot are returned.
+
+ Raises:
+ ValueError: if ``to_snapshot_id_inclusive`` is not present in the
table metadata.
+ """
+ to_snapshot = table_metadata.snapshot_by_id(to_snapshot_id_inclusive)
+ if to_snapshot is None:
+ raise ValueError(f"Cannot find snapshot: {to_snapshot_id_inclusive}")
+
+ if from_snapshot_id_exclusive is not None:
+ for snapshot in ancestors_of(to_snapshot, table_metadata):
+ if snapshot.snapshot_id == from_snapshot_id_exclusive:
+ break
+ yield snapshot
+ else:
+ yield from ancestors_of(to_snapshot, table_metadata)
+
+
+def is_parent_ancestor_of(snapshot_id: int, ancestor_parent_snapshot_id: int,
table_metadata: TableMetadata) -> bool:
Review Comment:
[AI reviewer aid] Mirrors Java's
[`SnapshotUtil.isParentAncestorOf`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L77-L86),
including the [`Cannot find
snapshot`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/util/SnapshotUtil.java#L148-L154)
raise on missing snapshot (Java throws one hop down, via `ancestorsOf(long,
lookup)`).
##########
tests/integration/test_reads.py:
##########
@@ -1272,3 +1272,152 @@ def test_scan_source_field_missing_in_spec(catalog:
Catalog, spark: SparkSession
table = catalog.load_table(identifier)
assert len(list(table.scan().plan_files())) == 3
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"),
lf("session_catalog")])
+def test_incremental_append_scan_append_only(catalog: Catalog) -> None:
+ test_table = catalog.load_table("default.test_incremental_read")
+
+ scan = (
+ test_table.incremental_append_scan()
+ .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
+ .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
+ )
+
+ # snapshots[1] adds 1 file (letter=b); snapshots[2] adds 2 files
(letter=b, letter=c).
+ assert len(list(scan.plan_files())) == 3
+ assert sorted(scan.to_arrow()["number"].to_pylist()) == [2, 3, 4]
+
+ # All read paths return the same rows.
+ assert len(scan.to_arrow_batch_reader().read_all()) == 3
+ assert len(scan.to_pandas()) == 3
+ assert len(scan.to_polars()) == 3
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"),
lf("session_catalog")])
+def test_incremental_append_scan_ignores_non_append_snapshots(catalog:
Catalog) -> None:
+ test_table = catalog.load_table("default.test_incremental_read")
+
+ # snapshots[3] is a delete. The append scan must ignore it.
+ scan = test_table.incremental_append_scan(
+ from_snapshot_id_exclusive=test_table.snapshots()[0].snapshot_id,
+ to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id,
+ )
+ assert len(list(scan.plan_files())) == 3
+ assert sorted(scan.to_arrow()["number"].to_pylist()) == [2, 3, 4]
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"),
lf("session_catalog")])
+def test_incremental_append_scan_empty_range(catalog: Catalog) -> None:
+ test_table = catalog.load_table("default.test_incremental_read")
+
+ # snapshots[3] is the only snapshot in the range and is a delete; the scan
must return empty.
+ scan = test_table.incremental_append_scan(
+ from_snapshot_id_exclusive=test_table.snapshots()[2].snapshot_id,
+ to_snapshot_id_inclusive=test_table.snapshots()[3].snapshot_id,
+ )
+ assert list(scan.plan_files()) == []
+ assert len(scan.to_arrow()) == 0
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"),
lf("session_catalog")])
+def test_incremental_append_scan_schema_evolution_within_range(catalog:
Catalog) -> None:
+ test_table = catalog.load_table("default.test_incremental_read")
+
+ # snapshots[1..2] are on the original schema (number, letter);
snapshots[4] is on the evolved
+ # schema (number, letter, extra) after ALTER TABLE ADD COLUMN. The scan
must project the older
+ # rows onto the current schema (extra -> null) and pick up the new value
for the newer row.
+ scan = (
+ test_table.incremental_append_scan()
+ .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
+ .to_snapshot_inclusive(test_table.snapshots()[4].snapshot_id)
+ )
+ assert len(list(scan.plan_files())) == 4
+
+ expected_schema = pa.schema([pa.field("number", pa.int32()),
pa.field("letter", pa.string()), pa.field("extra", pa.int32())])
+ result_table = scan.to_arrow()
+ assert result_table.schema.equals(expected_schema)
+ rows = zip(
+ result_table["number"].to_pylist(),
+ result_table["letter"].to_pylist(),
+ result_table["extra"].to_pylist(),
+ strict=True,
+ )
+ assert sorted(rows, key=lambda r: r[0]) == [(2, "b", None), (3, "c",
None), (4, "b", None), (5, "d", 100)]
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"),
lf("session_catalog")])
+def test_incremental_append_scan_partition_pruning(catalog: Catalog) -> None:
+ test_table = catalog.load_table("default.test_incremental_read")
+
+ # `letter=c` only appears in snapshots[2]. The manifest evaluator rejects
snapshots[1]'s
+ # manifest (letter=b only); the partition evaluator rejects the letter=b
entry in
+ # snapshots[2]'s manifest. One file remains.
+ scan = (
+ test_table.incremental_append_scan(row_filter=EqualTo("letter", "c"))
+ .from_snapshot_exclusive(test_table.snapshots()[0].snapshot_id)
+ .to_snapshot_inclusive(test_table.snapshots()[2].snapshot_id)
+ )
+ assert len(list(scan.plan_files())) == 1
+ assert scan.to_arrow()["number"].to_pylist() == [3]
+
+
[email protected]
[email protected]("catalog", [lf("session_catalog_hive"),
lf("session_catalog")])
+def test_incremental_append_scan_metrics_pruning(catalog: Catalog) -> None:
+ test_table = catalog.load_table("default.test_incremental_read")
+
+ # Non-partition predicate: the manifest/partition evaluators degenerate,
leaving the per-file
Review Comment:
[AI reviewer aid] Filters on a non-partition column (`number`), so the
manifest and partition evaluators degenerate to ALWAYS_TRUE and it's the
per-file metrics evaluator (column min/max/null stats) that must do all the
pruning. Covers a layer of `ManifestGroupPlanner` that the existing `DataScan`
integration coverage doesn't exercise end-to-end through a real scan.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]