smaheshwar-pltr commented on code in PR #2031: URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102779860
########## pyiceberg/table/__init__.py: ########## @@ -1688,102 +1887,252 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent return set() -class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) +class DataScan(FileBasedScan, TableScan): + """A scan of a table's data. - @cached_property + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + snapshot_id: + Optional Snapshot ID to time travel to. If None, + scans the table as of the current snapshot ID. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + def plan_files(self) -> Iterable[FileScanTask]: + """Plans the relevant files by filtering on the PartitionSpecs. + + Returns: + List of FileScanTasks that contain both data and delete files. + """ + snapshot = self.snapshot() + if not snapshot: + return iter([]) + + return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io)) + + # TODO: Document motivation and un-caching + @property def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) + return self._manifest_group_planner.partition_filters - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] +A = TypeVar("A", bound="IncrementalScan", covariant=True) - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) +class IncrementalScan(AbstractTableScan, ABC): + """A base class for incremental scans.""" - # The lambda created here is run in multiple threads. - # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single - # shared instance across multiple threads. - return lambda data_file: _InclusiveMetricsEvaluator( - schema, - self.row_filter, - self.case_sensitive, - include_empty_files, - ).eval(data_file) + from_snapshot_id_exclusive: Optional[int] + to_snapshot_id_inclusive: Optional[int] - def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: - spec = self.table_metadata.specs()[spec_id] + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + self.to_snapshot_id_inclusive = to_snapshot_id_inclusive - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - # return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - from pyiceberg.expressions.visitors import residual_evaluator_of + def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: Optional[int]) -> A: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). - # assert self.row_filter == False - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) - ) + Args: + from_snapshot_id_exclusive: the start snapshot ID (exclusive) - def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. + Returns: + this for method chaining + """ + return self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive) + + def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: Optional[int]) -> A: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). Args: - min_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. + to_snapshot_id_inclusive: the end snapshot ID (inclusive) Returns: - Boolean indicating if it is either a data file, or a relevant delete file. + this for method chaining """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive) + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + +class IncrementalAppendScan(IncrementalScan, FileBasedScan): + """An incremental scan of a table's data that accumulates appended data between two snapshots. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is + ultimately planned, this must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. + Omitting it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + from_snapshot_id_exclusive, + to_snapshot_id_inclusive, + options, + limit, ) def plan_files(self) -> Iterable[FileScanTask]: - """Plans the relevant files by filtering on the PartitionSpecs. + from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() - Returns: - List of FileScanTasks that contain both data and delete files. - """ - snapshot = self.snapshot() - if not snapshot: + append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata) + if len(append_snapshots) == 0: return iter([]) + append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} + + manifests = { + manifest_file + for snapshot in append_snapshots + for manifest_file in snapshot.manifests(self.io) + if manifest_file.content == ManifestContent.DATA and manifest_file.added_snapshot_id in append_snapshot_ids + } + + return self._manifest_group_planner.plan_files( + manifests=list(manifests), + manifest_entry_filter=lambda manifest_entry: manifest_entry.snapshot_id in append_snapshot_ids + and manifest_entry.status == ManifestEntryStatus.ADDED, + ) + + def _validate_and_resolve_snapshots(self) -> tuple[int, int]: + current_snapshot = self.table_metadata.current_snapshot() + to_snapshot_id = self.to_snapshot_id_inclusive + + if self.from_snapshot_id_exclusive is None: + raise ValueError("Start snapshot of append scan unspecified, please set from_snapshot_id_exclusive") + + if to_snapshot_id is None: + if current_snapshot is None: + raise ValueError("End snapshot of append scan is not set and table has no current snapshot") + + to_snapshot_id = current_snapshot.snapshot_id + elif self._is_snapshot_missing(to_snapshot_id): + raise ValueError(f"End snapshot of append scan not found on table metadata: {to_snapshot_id}") + + if self._is_snapshot_missing(self.from_snapshot_id_exclusive): + raise ValueError(f"Start snapshot of append scan not found on table metadata: {self.from_snapshot_id_exclusive}") + + if not is_ancestor_of(to_snapshot_id, self.from_snapshot_id_exclusive, self.table_metadata): + raise ValueError( + f"Append scan's start snapshot {self.from_snapshot_id_exclusive} is not an ancestor of end snapshot {to_snapshot_id}" + ) + + return self.from_snapshot_id_exclusive, to_snapshot_id + + # TODO: Note behaviour change from DataScan that we throw + def _is_snapshot_missing(self, snapshot_id: int) -> bool: + """Return whether the snapshot ID is missing in the table metadata.""" + return self.table_metadata.snapshot_by_id(snapshot_id) is None + + @staticmethod + def _appends_between( + from_snapshot_id_exclusive: int, to_snapshot_id_inclusive: int, table_metadata: TableMetadata + ) -> List[Snapshot]: + """Return the list of snapshots that are appends between two snapshot IDs.""" + return [ + snapshot + for snapshot in ancestors_between_ids( + from_snapshot_id_exclusive=from_snapshot_id_exclusive, + to_snapshot_id_inclusive=to_snapshot_id_inclusive, + table_metadata=table_metadata, + ) + if snapshot.summary is not None and snapshot.summary.operation == Operation.APPEND + ] + + +class ManifestGroupPlanner: Review Comment: This class effectively extracts the code relevant to planning based on manifest files previously in `DataScan`. The code is largely the same (differences pointed out in comments) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org