smaheshwar-pltr commented on code in PR #2031:
URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2101329296


##########
pyiceberg/table/__init__.py:
##########
@@ -1536,10 +1595,177 @@ def __init__(
         self.row_filter = _parse_row_filter(row_filter)
         self.selected_fields = selected_fields
         self.case_sensitive = case_sensitive
-        self.snapshot_id = snapshot_id
         self.options = options
         self.limit = limit
 
+    @abstractmethod
+    def projection(self) -> Schema: ...
+
+    @abstractmethod
+    def plan_files(self) -> Iterable[ScanTask]: ...
+
+    @abstractmethod
+    def to_arrow(self) -> pa.Table: ...
+
+    @abstractmethod
+    def count(self) -> int: ...
+
+    def select(self: S, *field_names: str) -> S:
+        if "*" in self.selected_fields:
+            return self.update(selected_fields=field_names)
+        return 
self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names))))
+
+    def filter(self: S, expr: Union[str, BooleanExpression]) -> S:
+        return self.update(row_filter=And(self.row_filter, 
_parse_row_filter(expr)))
+
+    def with_case_sensitive(self: S, case_sensitive: bool = True) -> S:
+        return self.update(case_sensitive=case_sensitive)
+
+    def update(self: S, **overrides: Any) -> S:
+        """Create a copy of this table scan with updated fields."""
+        return type(self)(**{**self.__dict__, **overrides})
+
+    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
+        """Read a Pandas DataFrame eagerly from this Iceberg table scan.
+
+        Returns:
+            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table 
scan
+        """
+        return self.to_arrow().to_pandas(**kwargs)
+
+    def to_duckdb(self, table_name: str, connection: 
Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
+        """Shorthand for loading this table scan in DuckDB.
+
+        Returns:
+            DuckDBPyConnection: In memory DuckDB connection with the Iceberg 
table scan.
+        """
+        import duckdb
+
+        con = connection or duckdb.connect(database=":memory:")
+        con.register(table_name, self.to_arrow())
+
+        return con
+
+    def to_ray(self) -> ray.data.dataset.Dataset:
+        """Read a Ray Dataset eagerly from this Iceberg table scan.
+
+        Returns:
+            ray.data.dataset.Dataset: Materialized Ray Dataset from the 
Iceberg table scan
+        """
+        import ray
+
+        return ray.data.from_arrow(self.to_arrow())
+
+    def to_polars(self) -> pl.DataFrame:
+        """Read a Polars DataFrame from this Iceberg table scan.
+
+        Returns:
+            pl.DataFrame: Materialized Polars Dataframe from the Iceberg table 
scan
+        """
+        import polars as pl
+
+        result = pl.from_arrow(self.to_arrow())
+        if isinstance(result, pl.Series):
+            result = result.to_frame()
+
+        return result
+
+
+class FileBasedScan(AbstractTableScan, ABC):
+    """A base class for table scans that plan FileScanTasks."""
+
+    @abstractmethod
+    def plan_files(self) -> Iterable[FileScanTask]: ...
+
+    def to_arrow(self) -> pa.Table:
+        """Read an Arrow table eagerly from this scan.
+
+        All rows will be loaded into memory at once.
+
+        Returns:
+            pa.Table: Materialized Arrow Table from the Iceberg table scan
+        """
+        from pyiceberg.io.pyarrow import ArrowScan
+
+        return ArrowScan(
+            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
+        ).to_table(self.plan_files())
+
+    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
+        """Return an Arrow RecordBatchReader from this scan.
+
+        For large results, using a RecordBatchReader requires less memory than
+        loading an Arrow Table for the same DataScan, because a RecordBatch
+        is read one at a time.
+
+        Returns:
+            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg 
table scan
+                which can be used to read a stream of record batches one by 
one.
+        """
+        import pyarrow as pa
+
+        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
+
+        target_schema = schema_to_pyarrow(self.projection())
+        batches = ArrowScan(
+            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
+        ).to_record_batches(self.plan_files())
+
+        return pa.RecordBatchReader.from_batches(
+            target_schema,
+            batches,
+        ).cast(target_schema)
+
+    def count(self) -> int:
+        from pyiceberg.io.pyarrow import ArrowScan
+
+        # Usage: Calculates the total number of records in a Scan that haven't 
had positional deletes.
+        res = 0
+        # every task is a FileScanTask
+        tasks = self.plan_files()
+
+        for task in tasks:
+            # task.residual is a Boolean Expression if the filter condition is 
fully satisfied by the
+            # partition value and task.delete_files represents that positional 
delete haven't been merged yet
+            # hence those files have to read as a pyarrow table applying the 
filter and deletes
+            if task.residual == AlwaysTrue() and len(task.delete_files) == 0:
+                # Every File has a metadata stat that stores the file record 
count
+                res += task.file.record_count
+            else:
+                arrow_scan = ArrowScan(
+                    table_metadata=self.table_metadata,
+                    io=self.io,
+                    projected_schema=self.projection(),
+                    row_filter=self.row_filter,
+                    case_sensitive=self.case_sensitive,
+                )
+                tbl = arrow_scan.to_table([task])
+                res += len(tbl)
+        return res
+
+
+T = TypeVar("T", bound="TableScan", covariant=True)
+
+
+class TableScan(AbstractTableScan, ABC):

Review Comment:
   The hierarchy is a bit weird here. I figured the concern with 
https://github.com/apache/iceberg-python/pull/533 was user-facing changes on a 
public class, so tried to minimise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to