smaheshwar-pltr commented on code in PR #3364:
URL: https://github.com/apache/iceberg-python/pull/3364#discussion_r3260044588
##########
pyiceberg/table/__init__.py:
##########
@@ -1917,131 +2090,42 @@ def _min_sequence_number(manifests:
list[ManifestFile]) -> int:
class DataScan(TableScan):
- def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
- project = inclusive_projection(self.table_metadata.schema(),
self.table_metadata.specs()[spec_id], self.case_sensitive)
- return project(self.row_filter)
-
@cached_property
- def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
- return KeyDefaultDict(self._build_partition_projection)
+ def _manifest_planner(self) -> ManifestGroupPlanner:
Review Comment:
Cached so that the planner's own `partition_filters` `cached_property` lives
for the scan's lifetime — matches the pre-PR caching behaviour on `DataScan`
(where `partition_filters` was itself a `cached_property` directly).
##########
pyiceberg/table/__init__.py:
##########
@@ -1917,131 +2090,42 @@ def _min_sequence_number(manifests:
list[ManifestFile]) -> int:
class DataScan(TableScan):
- def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
- project = inclusive_projection(self.table_metadata.schema(),
self.table_metadata.specs()[spec_id], self.case_sensitive)
- return project(self.row_filter)
-
@cached_property
- def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
- return KeyDefaultDict(self._build_partition_projection)
+ def _manifest_planner(self) -> ManifestGroupPlanner:
+ return ManifestGroupPlanner(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ options=self.options,
+ )
- def _build_manifest_evaluator(self, spec_id: int) ->
Callable[[ManifestFile], bool]:
- spec = self.table_metadata.specs()[spec_id]
- return manifest_evaluator(spec, self.table_metadata.schema(),
self.partition_filters[spec_id], self.case_sensitive)
+ @property
+ def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+ return self._manifest_planner.partition_filters
- def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile],
bool]:
- spec = self.table_metadata.specs()[spec_id]
- partition_type = spec.partition_type(self.table_metadata.schema())
- partition_schema = Schema(*partition_type.fields)
- partition_expr = self.partition_filters[spec_id]
+ def scan_plan_helper(self) -> Iterator[list[ManifestEntry]]:
Review Comment:
Public; only call site within PyIceberg is `pyiceberg/table/inspect.py`.
Kept for back-compat — external library users may rely on it. Body now
delegates to `ManifestGroupPlanner.plan_manifest_entries` so the work isn't
duplicated with `IncrementalAppendScan`. ([Prior
context](https://github.com/apache/iceberg-python/pull/2031#discussion_r2102733224)
on whether the underscore-prefixed helpers needed a deprecation cycle —
they're gone now and aren't documented as supported.)
##########
pyiceberg/table/__init__.py:
##########
@@ -2103,116 +2189,346 @@ def plan_files(self) -> Iterable[FileScanTask]:
return self._plan_files_server_side()
return self._plan_files_local()
- def to_arrow(self) -> pa.Table:
- """Read an Arrow table eagerly from this DataScan.
+ def count(self) -> int:
+ from pyiceberg.io.pyarrow import ArrowScan
- All rows will be loaded into memory at once.
+ # Usage: Calculates the total number of records in a Scan that haven't
had positional deletes.
+ res = 0
+ # every task is a FileScanTask
+ tasks = self.plan_files()
- Returns:
- pa.Table: Materialized Arrow Table from the Iceberg table's
DataScan
- """
- from pyiceberg.io.pyarrow import ArrowScan
+ for task in tasks:
+ # task.residual is a Boolean Expression if the filter condition is
fully satisfied by the
+ # partition value and task.delete_files represents that positional
delete haven't been merged yet
+ # hence those files have to read as a pyarrow table applying the
filter and deletes
+ if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+ # Every File has a metadata stat that stores the file record
count
+ res += task.file.record_count
+ else:
+ arrow_scan = ArrowScan(
+ table_metadata=self.table_metadata,
+ io=self.io,
+ projected_schema=self.projection(),
+ row_filter=self.row_filter,
+ case_sensitive=self.case_sensitive,
+ )
+ tbl = arrow_scan.to_table([task])
+ res += len(tbl)
+ return res
- return ArrowScan(
- self.table_metadata, self.io, self.projection(), self.row_filter,
self.case_sensitive, self.limit
- ).to_table(self.plan_files())
- def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
- """Return an Arrow RecordBatchReader from this DataScan.
+IAS = TypeVar("IAS", bound="IncrementalAppendScan", covariant=True)
- For large results, using a RecordBatchReader requires less memory than
- loading an Arrow Table for the same DataScan, because a RecordBatch
- is read one at a time.
- Returns:
- pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg
table's DataScan
- which can be used to read a stream of record batches one by
one.
- """
- import pyarrow as pa
+class IncrementalAppendScan(BaseScan):
Review Comment:
Mirrors Java's
[`IncrementalAppendScan`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/api/src/main/java/org/apache/iceberg/IncrementalAppendScan.java#L22)
interface and
[`BaseIncrementalAppendScan`](https://github.com/apache/iceberg/blob/2f6606a247e2b16be46ca6c02fc4cfc2e17691e6/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L31)
implementation. Only the append variant of `IncrementalScan` — changelog scan
is out of scope here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]