smaheshwar-pltr commented on code in PR #2031: URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102769661
########## pyiceberg/table/__init__.py: ########## @@ -1688,102 +1887,252 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent return set() -class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) +class DataScan(FileBasedScan, TableScan): + """A scan of a table's data. - @cached_property + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + snapshot_id: + Optional Snapshot ID to time travel to. If None, + scans the table as of the current snapshot ID. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + def plan_files(self) -> Iterable[FileScanTask]: + """Plans the relevant files by filtering on the PartitionSpecs. + + Returns: + List of FileScanTasks that contain both data and delete files. + """ + snapshot = self.snapshot() + if not snapshot: + return iter([]) + + return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io)) + + # TODO: Document motivation and un-caching + @property def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) + return self._manifest_group_planner.partition_filters - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] +A = TypeVar("A", bound="IncrementalScan", covariant=True) - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) +class IncrementalScan(AbstractTableScan, ABC): Review Comment: I'm differing from both https://github.com/apache/iceberg-python/pull/533 and the Java-side here in keeping this thin and not performing any snapshot validation in this abstract super-class. This is because of my claim in https://github.com/apache/iceberg-python/pull/2031#discussion_r2102674779 (happy to discuss) - IMHO, both append and changelog scans should perform their own, probably different validation because we're designing APIs for user, not engine use. cc @chinmay-bhat, would love to hear your thoughts on this and this PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org