smaheshwar-pltr commented on code in PR #2031: URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102624828
########## pyiceberg/table/__init__.py: ########## @@ -1688,102 +1888,252 @@ def _match_deletes_to_data_file(data_entry: ManifestEntry, positional_delete_ent return set() -class DataScan(TableScan): - def _build_partition_projection(self, spec_id: int) -> BooleanExpression: - project = inclusive_projection(self.table_metadata.schema(), self.table_metadata.specs()[spec_id], self.case_sensitive) - return project(self.row_filter) +class DataScan(FileBasedScan, TableScan): + """A scan of a table's data. - @cached_property + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + snapshot_id: + Optional Snapshot ID to time travel to. If None, + scans the table as of the current snapshot ID. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + def plan_files(self) -> Iterable[FileScanTask]: + """Plans the relevant files by filtering on the PartitionSpecs. + + Returns: + List of FileScanTasks that contain both data and delete files. + """ + snapshot = self.snapshot() + if not snapshot: + return iter([]) + + return self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io)) + + # TODO: Document motivation and un-caching + @property def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]: - return KeyDefaultDict(self._build_partition_projection) + return self._manifest_group_planner.partition_filters - def _build_manifest_evaluator(self, spec_id: int) -> Callable[[ManifestFile], bool]: - spec = self.table_metadata.specs()[spec_id] - return manifest_evaluator(spec, self.table_metadata.schema(), self.partition_filters[spec_id], self.case_sensitive) - def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], bool]: - spec = self.table_metadata.specs()[spec_id] - partition_type = spec.partition_type(self.table_metadata.schema()) - partition_schema = Schema(*partition_type.fields) - partition_expr = self.partition_filters[spec_id] +A = TypeVar("A", bound="IncrementalScan", covariant=True) - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - return lambda data_file: expression_evaluator(partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]: - schema = self.table_metadata.schema() - include_empty_files = strtobool(self.options.get("include_empty_files", "false")) +class IncrementalScan(AbstractTableScan, ABC): + """A base class for incremental scans.""" - # The lambda created here is run in multiple threads. - # So we avoid creating _InclusiveMetricsEvaluator methods bound to a single - # shared instance across multiple threads. - return lambda data_file: _InclusiveMetricsEvaluator( - schema, - self.row_filter, - self.case_sensitive, - include_empty_files, - ).eval(data_file) + from_snapshot_id_exclusive: Optional[int] + to_snapshot_id_inclusive: Optional[int] - def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], ResidualEvaluator]: - spec = self.table_metadata.specs()[spec_id] + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__(table_metadata, io, row_filter, selected_fields, case_sensitive, options, limit) + self.from_snapshot_id_exclusive = from_snapshot_id_exclusive + self.to_snapshot_id_inclusive = to_snapshot_id_inclusive - # The lambda created here is run in multiple threads. - # So we avoid creating _EvaluatorExpression methods bound to a single - # shared instance across multiple threads. - # return lambda data_file: (partition_schema, partition_expr, self.case_sensitive)(data_file.partition) - from pyiceberg.expressions.visitors import residual_evaluator_of + def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: Optional[int]) -> A: + """Instructs this scan to look for changes starting from a particular snapshot (exclusive). - # assert self.row_filter == False - return lambda datafile: ( - residual_evaluator_of( - spec=spec, - expr=self.row_filter, - case_sensitive=self.case_sensitive, - schema=self.table_metadata.schema(), - ) - ) + Args: + from_snapshot_id_exclusive: the start snapshot ID (exclusive) - def _check_sequence_number(self, min_sequence_number: int, manifest: ManifestFile) -> bool: - """Ensure that no manifests are loaded that contain deletes that are older than the data. + Returns: + this for method chaining + """ + return self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive) + + def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: Optional[int]) -> A: + """Instructs this scan to look for changes up to a particular snapshot (inclusive). Args: - min_sequence_number (int): The minimal sequence number. - manifest (ManifestFile): A ManifestFile that can be either data or deletes. + to_snapshot_id_inclusive: the end snapshot ID (inclusive) Returns: - Boolean indicating if it is either a data file, or a relevant delete file. + this for method chaining """ - return manifest.content == ManifestContent.DATA or ( - # Not interested in deletes that are older than the data - manifest.content == ManifestContent.DELETES - and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= min_sequence_number + return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive) + + def projection(self) -> Schema: + current_schema = self.table_metadata.schema() + + if "*" in self.selected_fields: + return current_schema + + return current_schema.select(*self.selected_fields, case_sensitive=self.case_sensitive) + + +class IncrementalAppendScan(IncrementalScan, FileBasedScan): + """An incremental scan of a table's data that accumulates appended data between two snapshots. + + Args: + row_filter: + A string or BooleanExpression that describes the + desired rows + selected_fields: + A tuple of strings representing the column names + to return in the output dataframe. + case_sensitive: + If True column matching is case sensitive + from_snapshot_id_exclusive: + Optional ID of the "from" snapshot, to start the incremental scan from, exclusively. When the scan is + ultimately planned, this must not be None. + to_snapshot_id_inclusive: + Optional ID of the "to" snapshot, to end the incremental scan at, inclusively. + Omitting it will default to the table's current snapshot. + options: + Additional Table properties as a dictionary of + string key value pairs to use for this scan. + limit: + An integer representing the number of rows to + return in the scan result. If None, fetches all + matching rows. + """ + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE, + selected_fields: Tuple[str, ...] = ("*",), + case_sensitive: bool = True, + from_snapshot_id_exclusive: Optional[int] = None, + to_snapshot_id_inclusive: Optional[int] = None, + options: Properties = EMPTY_DICT, + limit: Optional[int] = None, + ): + super().__init__( + table_metadata, + io, + row_filter, + selected_fields, + case_sensitive, + from_snapshot_id_exclusive, + to_snapshot_id_inclusive, + options, + limit, ) def plan_files(self) -> Iterable[FileScanTask]: - """Plans the relevant files by filtering on the PartitionSpecs. + from_snapshot_id, to_snapshot_id = self._validate_and_resolve_snapshots() - Returns: - List of FileScanTasks that contain both data and delete files. - """ - snapshot = self.snapshot() - if not snapshot: + append_snapshots: List[Snapshot] = self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata) + if len(append_snapshots) == 0: return iter([]) + append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in append_snapshots} + + manifests = { Review Comment: Note that we maintain a set of manifest files, just like https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L70-L74. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org