smaheshwar-pltr commented on code in PR #2031:
URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102624828


##########
pyiceberg/table/__init__.py:
##########
@@ -1688,102 +1888,252 @@ def _match_deletes_to_data_file(data_entry: 
ManifestEntry, positional_delete_ent
         return set()
 
 
-class DataScan(TableScan):
-    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
-        project = inclusive_projection(self.table_metadata.schema(), 
self.table_metadata.specs()[spec_id], self.case_sensitive)
-        return project(self.row_filter)
+class DataScan(FileBasedScan, TableScan):
+    """A scan of a table's data.
 
-    @cached_property
+    Args:
+        row_filter:
+            A string or BooleanExpression that describes the
+            desired rows
+        selected_fields:
+            A tuple of strings representing the column names
+            to return in the output dataframe.
+        case_sensitive:
+            If True column matching is case sensitive
+        snapshot_id:
+            Optional Snapshot ID to time travel to. If None,
+            scans the table as of the current snapshot ID.
+        options:
+            Additional Table properties as a dictionary of
+            string key value pairs to use for this scan.
+        limit:
+            An integer representing the number of rows to
+            return in the scan result. If None, fetches all
+            matching rows.
+    """
+
+    def plan_files(self) -> Iterable[FileScanTask]:
+        """Plans the relevant files by filtering on the PartitionSpecs.
+
+        Returns:
+            List of FileScanTasks that contain both data and delete files.
+        """
+        snapshot = self.snapshot()
+        if not snapshot:
+            return iter([])
+
+        return 
self._manifest_group_planner.plan_files(manifests=snapshot.manifests(self.io))
+
+    # TODO: Document motivation and un-caching
+    @property
     def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
-        return KeyDefaultDict(self._build_partition_projection)
+        return self._manifest_group_planner.partition_filters
 
-    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
-        spec = self.table_metadata.specs()[spec_id]
-        return manifest_evaluator(spec, self.table_metadata.schema(), 
self.partition_filters[spec_id], self.case_sensitive)
 
-    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], 
bool]:
-        spec = self.table_metadata.specs()[spec_id]
-        partition_type = spec.partition_type(self.table_metadata.schema())
-        partition_schema = Schema(*partition_type.fields)
-        partition_expr = self.partition_filters[spec_id]
+A = TypeVar("A", bound="IncrementalScan", covariant=True)
 
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _EvaluatorExpression methods bound to a single
-        # shared instance across multiple threads.
-        return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, self.case_sensitive)(data_file.partition)
 
-    def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
-        schema = self.table_metadata.schema()
-        include_empty_files = 
strtobool(self.options.get("include_empty_files", "false"))
+class IncrementalScan(AbstractTableScan, ABC):
+    """A base class for incremental scans."""
 
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _InclusiveMetricsEvaluator methods bound to a 
single
-        # shared instance across multiple threads.
-        return lambda data_file: _InclusiveMetricsEvaluator(
-            schema,
-            self.row_filter,
-            self.case_sensitive,
-            include_empty_files,
-        ).eval(data_file)
+    from_snapshot_id_exclusive: Optional[int]
+    to_snapshot_id_inclusive: Optional[int]
 
-    def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], 
ResidualEvaluator]:
-        spec = self.table_metadata.specs()[spec_id]
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
+        selected_fields: Tuple[str, ...] = ("*",),
+        case_sensitive: bool = True,
+        from_snapshot_id_exclusive: Optional[int] = None,
+        to_snapshot_id_inclusive: Optional[int] = None,
+        options: Properties = EMPTY_DICT,
+        limit: Optional[int] = None,
+    ):
+        super().__init__(table_metadata, io, row_filter, selected_fields, 
case_sensitive, options, limit)
+        self.from_snapshot_id_exclusive = from_snapshot_id_exclusive
+        self.to_snapshot_id_inclusive = to_snapshot_id_inclusive
 
-        # The lambda created here is run in multiple threads.
-        # So we avoid creating _EvaluatorExpression methods bound to a single
-        # shared instance across multiple threads.
-        # return lambda data_file: (partition_schema, partition_expr, 
self.case_sensitive)(data_file.partition)
-        from pyiceberg.expressions.visitors import residual_evaluator_of
+    def from_snapshot_exclusive(self: A, from_snapshot_id_exclusive: 
Optional[int]) -> A:
+        """Instructs this scan to look for changes starting from a particular 
snapshot (exclusive).
 
-        # assert self.row_filter == False
-        return lambda datafile: (
-            residual_evaluator_of(
-                spec=spec,
-                expr=self.row_filter,
-                case_sensitive=self.case_sensitive,
-                schema=self.table_metadata.schema(),
-            )
-        )
+        Args:
+            from_snapshot_id_exclusive: the start snapshot ID (exclusive)
 
-    def _check_sequence_number(self, min_sequence_number: int, manifest: 
ManifestFile) -> bool:
-        """Ensure that no manifests are loaded that contain deletes that are 
older than the data.
+        Returns:
+            this for method chaining
+        """
+        return 
self.update(from_snapshot_id_exclusive=from_snapshot_id_exclusive)
+
+    def to_snapshot_inclusive(self: A, to_snapshot_id_inclusive: 
Optional[int]) -> A:
+        """Instructs this scan to look for changes up to a particular snapshot 
(inclusive).
 
         Args:
-            min_sequence_number (int): The minimal sequence number.
-            manifest (ManifestFile): A ManifestFile that can be either data or 
deletes.
+            to_snapshot_id_inclusive: the end snapshot ID (inclusive)
 
         Returns:
-            Boolean indicating if it is either a data file, or a relevant 
delete file.
+            this for method chaining
         """
-        return manifest.content == ManifestContent.DATA or (
-            # Not interested in deletes that are older than the data
-            manifest.content == ManifestContent.DELETES
-            and (manifest.sequence_number or INITIAL_SEQUENCE_NUMBER) >= 
min_sequence_number
+        return self.update(to_snapshot_id_inclusive=to_snapshot_id_inclusive)
+
+    def projection(self) -> Schema:
+        current_schema = self.table_metadata.schema()
+
+        if "*" in self.selected_fields:
+            return current_schema
+
+        return current_schema.select(*self.selected_fields, 
case_sensitive=self.case_sensitive)
+
+
+class IncrementalAppendScan(IncrementalScan, FileBasedScan):
+    """An incremental scan of a table's data that accumulates appended data 
between two snapshots.
+
+    Args:
+        row_filter:
+            A string or BooleanExpression that describes the
+            desired rows
+        selected_fields:
+            A tuple of strings representing the column names
+            to return in the output dataframe.
+        case_sensitive:
+            If True column matching is case sensitive
+        from_snapshot_id_exclusive:
+            Optional ID of the "from" snapshot, to start the incremental scan 
from, exclusively. When the scan is
+            ultimately planned, this must not be None.
+        to_snapshot_id_inclusive:
+            Optional ID of the "to" snapshot, to end the incremental scan at, 
inclusively.
+            Omitting it will default to the table's current snapshot.
+        options:
+            Additional Table properties as a dictionary of
+            string key value pairs to use for this scan.
+        limit:
+            An integer representing the number of rows to
+            return in the scan result. If None, fetches all
+            matching rows.
+    """
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
+        selected_fields: Tuple[str, ...] = ("*",),
+        case_sensitive: bool = True,
+        from_snapshot_id_exclusive: Optional[int] = None,
+        to_snapshot_id_inclusive: Optional[int] = None,
+        options: Properties = EMPTY_DICT,
+        limit: Optional[int] = None,
+    ):
+        super().__init__(
+            table_metadata,
+            io,
+            row_filter,
+            selected_fields,
+            case_sensitive,
+            from_snapshot_id_exclusive,
+            to_snapshot_id_inclusive,
+            options,
+            limit,
         )
 
     def plan_files(self) -> Iterable[FileScanTask]:
-        """Plans the relevant files by filtering on the PartitionSpecs.
+        from_snapshot_id, to_snapshot_id = 
self._validate_and_resolve_snapshots()
 
-        Returns:
-            List of FileScanTasks that contain both data and delete files.
-        """
-        snapshot = self.snapshot()
-        if not snapshot:
+        append_snapshots: List[Snapshot] = 
self._appends_between(from_snapshot_id, to_snapshot_id, self.table_metadata)
+        if len(append_snapshots) == 0:
             return iter([])
 
+        append_snapshot_ids: Set[int] = {snapshot.snapshot_id for snapshot in 
append_snapshots}
+
+        manifests = {

Review Comment:
   Note that we maintain a set of manifest files, just like 
https://github.com/apache/iceberg/blob/1911c94ea605a3d3f10a1994b046f00a5e9fdceb/core/src/main/java/org/apache/iceberg/BaseIncrementalAppendScan.java#L70-L74.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to