smaheshwar-pltr commented on code in PR #2031:
URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102769661
##########
pyiceberg/table/__init__.py:
##########
@@ -1688,102 +1887,252 @@ def _match_deletes_to_data_file(data_entry:
ManifestEntry, positional_delete_ent
return set()
-class DataScan(TableScan):
- def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
- project = inclusive_projection(self.table_metadata.schema(),
self.table_metadata.specs()[spec_id], self.case_sensitive)
- return project(self.row_filter)
+class DataScan(FileBasedScan, TableScan):
+ """A scan of a table's data.
- @cached_property
+ Args:
+ row_filter:
+ A string or BooleanExpression that describes the
+ desired rows
+ selected_fields:
+ A tuple of strings representing the column names
+ to return in the output dataframe.
+ case_sensitive:
+ If True column matching is case sensitive
+ snapshot_id:
+ Optional Snapshot ID to time travel to. If None,
+ scans the table as of the current snapshot ID.
+ options:
+ Additional Table properties as a dictionary of
+ string key value pairs to use for this scan.
+ limit:
+ An integer representing the number of rows to
+ return in the scan result. If None, fetches all
+ matching rows.
+ """
+
+ def plan_files(self) -> Iterable[FileScanTask]:
+ """Plans the relevant files by filtering on the PartitionSpecs.
+
+ Returns:
+ List of FileScanTasks that contain both data and delete files.
+ """
+ snapshot = self.snapshot()
+ if not snapshot:
+ return iter([])
+
+ return
self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io))
+
+ # TODO: Document motivation and un-caching
+ @property
def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
- return KeyDefaultDict(self._build_partition_projection)
+ return self._manifest_group_planner.partition_filters
- def _build_manifest_evaluator(self, spec_id: int) ->
Callable[[ManifestFile], bool]:
- spec = self.table_metadata.specs()[spec_id]
- return manifest_evaluator(spec, self.table_metadata.schema(),
self.partition_filters[spec_id], self.case_sensitive)
- def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile],
bool]:
- spec = self.table_metadata.specs()[spec_id]
- partition_type = spec.partition_type(self.table_metadata.schema())
- partition_schema = Schema(*partition_type.fields)
- partition_expr = self.partition_filters[spec_id]
+A = TypeVar("A", bound="IncrementalScan", covariant=True)
- # The lambda created here is run in multiple threads.
- # So we avoid creating _EvaluatorExpression methods bound to a single
- # shared instance across multiple threads.
- return lambda data_file: expression_evaluator(partition_schema,
partition_expr, self.case_sensitive)(data_file.partition)
- def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
- schema = self.table_metadata.schema()
- include_empty_files =
strtobool(self.options.get("include_empty_files", "false"))
+class IncrementalScan(AbstractTableScan, ABC):
Review Comment:
I'm differing from both https://github.com/apache/iceberg-python/pull/533
and the Java-side here in keeping this thin and not performing any snapshot
validation in this abstract super-class.
This is because of my claim in
https://github.com/apache/iceberg-python/pull/2031#discussion_r2102674779
(happy to discuss) - IMHO, both append and changelog scans should perform their
own, probably different validation because we're designing APIs for user, not
engine use.
cc @chinmay-bhat, would love to hear your thoughts on this and this PR!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]