smaheshwar-pltr commented on code in PR #2031: URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2101329296
########## pyiceberg/table/__init__.py: ########## @@ -1536,10 +1595,177 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit + @abstractmethod + def projection(self) -> Schema: ... + + @abstractmethod + def plan_files(self) -> Iterable[ScanTask]: ... + + @abstractmethod + def to_arrow(self) -> pa.Table: ... + + @abstractmethod + def count(self) -> int: ... + + def select(self: S, *field_names: str) -> S: + if "*" in self.selected_fields: + return self.update(selected_fields=field_names) + return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) + + def filter(self: S, expr: Union[str, BooleanExpression]) -> S: + return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) + + def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: + return self.update(case_sensitive=case_sensitive) + + def update(self: S, **overrides: Any) -> S: + """Create a copy of this table scan with updated fields.""" + return type(self)(**{**self.__dict__, **overrides}) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + """Read a Pandas DataFrame eagerly from this Iceberg table scan. + + Returns: + pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan + """ + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Shorthand for loading this table scan in DuckDB. + + Returns: + DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. + """ + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + """Read a Ray Dataset eagerly from this Iceberg table scan. + + Returns: + ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan + """ + import ray + + return ray.data.from_arrow(self.to_arrow()) + + def to_polars(self) -> pl.DataFrame: + """Read a Polars DataFrame from this Iceberg table scan. + + Returns: + pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan + """ + import polars as pl + + result = pl.from_arrow(self.to_arrow()) + if isinstance(result, pl.Series): + result = result.to_frame() + + return result + + +class FileBasedScan(AbstractTableScan, ABC): + """A base class for table scans that plan FileScanTasks.""" + + @abstractmethod + def plan_files(self) -> Iterable[FileScanTask]: ... + + def to_arrow(self) -> pa.Table: + """Read an Arrow table eagerly from this scan. + + All rows will be loaded into memory at once. + + Returns: + pa.Table: Materialized Arrow Table from the Iceberg table scan + """ + from pyiceberg.io.pyarrow import ArrowScan + + return ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_table(self.plan_files()) + + def to_arrow_batch_reader(self) -> pa.RecordBatchReader: + """Return an Arrow RecordBatchReader from this scan. + + For large results, using a RecordBatchReader requires less memory than + loading an Arrow Table for the same DataScan, because a RecordBatch + is read one at a time. + + Returns: + pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg table scan + which can be used to read a stream of record batches one by one. + """ + import pyarrow as pa + + from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow + + target_schema = schema_to_pyarrow(self.projection()) + batches = ArrowScan( + self.table_metadata, self.io, self.projection(), self.row_filter, self.case_sensitive, self.limit + ).to_record_batches(self.plan_files()) + + return pa.RecordBatchReader.from_batches( + target_schema, + batches, + ).cast(target_schema) + + def count(self) -> int: + from pyiceberg.io.pyarrow import ArrowScan + + # Usage: Calculates the total number of records in a Scan that haven't had positional deletes. + res = 0 + # every task is a FileScanTask + tasks = self.plan_files() + + for task in tasks: + # task.residual is a Boolean Expression if the filter condition is fully satisfied by the + # partition value and task.delete_files represents that positional delete haven't been merged yet + # hence those files have to read as a pyarrow table applying the filter and deletes + if task.residual == AlwaysTrue() and len(task.delete_files) == 0: + # Every File has a metadata stat that stores the file record count + res += task.file.record_count + else: + arrow_scan = ArrowScan( + table_metadata=self.table_metadata, + io=self.io, + projected_schema=self.projection(), + row_filter=self.row_filter, + case_sensitive=self.case_sensitive, + ) + tbl = arrow_scan.to_table([task]) + res += len(tbl) + return res + + +T = TypeVar("T", bound="TableScan", covariant=True) + + +class TableScan(AbstractTableScan, ABC): Review Comment: The hierarchy is a bit weird here. I figured the concern with https://github.com/apache/iceberg-python/pull/533 was user-facing changes on a public class, so tried to minimise -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org