smaheshwar-pltr commented on code in PR #2031: URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102720237
########## pyiceberg/table/__init__.py: ########## @@ -1536,10 +1595,181 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit + @abstractmethod + def projection(self) -> Schema: ... + + @abstractmethod + def plan_files(self) -> Iterable[ScanTask]: ... + + @abstractmethod + def to_arrow(self) -> pa.Table: ... + + @abstractmethod + def count(self) -> int: ... + + def select(self: S, *field_names: str) -> S: + if "*" in self.selected_fields: + return self.update(selected_fields=field_names) + return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) + + def filter(self: S, expr: Union[str, BooleanExpression]) -> S: + return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) + + def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: + return self.update(case_sensitive=case_sensitive) + + def update(self: S, **overrides: Any) -> S: + """Create a copy of this table scan with updated fields.""" + return type(self)(**{**self.__dict__, **overrides}) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + """Read a Pandas DataFrame eagerly from this Iceberg table scan. + + Returns: + pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan + """ + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Shorthand for loading this table scan in DuckDB. + + Returns: + DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. + """ + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + """Read a Ray Dataset eagerly from this Iceberg table scan. + + Returns: + ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan + """ + import ray + + return ray.data.from_arrow(self.to_arrow()) + + def to_polars(self) -> pl.DataFrame: + """Read a Polars DataFrame from this Iceberg table scan. + + Returns: + pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan + """ + import polars as pl + + result = pl.from_arrow(self.to_arrow()) + if isinstance(result, pl.Series): + result = result.to_frame() + + return result + + +class FileBasedScan(AbstractTableScan, ABC): + """A base class for table scans that plan FileScanTasks.""" + + @cached_property + def _manifest_group_planner(self) -> ManifestGroupPlanner: Review Comment: To me, the abstraction of something that handles planning such tasks from manifests makes sense and naturally reduces duplication with the new append scan type. I think other design decisions are possible like introducing a member for it or rethinking the abstraction. This feels fine (I like keeping this class thin) ########## pyiceberg/table/__init__.py: ########## @@ -1536,10 +1595,181 @@ def __init__( self.row_filter = _parse_row_filter(row_filter) self.selected_fields = selected_fields self.case_sensitive = case_sensitive - self.snapshot_id = snapshot_id self.options = options self.limit = limit + @abstractmethod + def projection(self) -> Schema: ... + + @abstractmethod + def plan_files(self) -> Iterable[ScanTask]: ... + + @abstractmethod + def to_arrow(self) -> pa.Table: ... + + @abstractmethod + def count(self) -> int: ... + + def select(self: S, *field_names: str) -> S: + if "*" in self.selected_fields: + return self.update(selected_fields=field_names) + return self.update(selected_fields=tuple(set(self.selected_fields).intersection(set(field_names)))) + + def filter(self: S, expr: Union[str, BooleanExpression]) -> S: + return self.update(row_filter=And(self.row_filter, _parse_row_filter(expr))) + + def with_case_sensitive(self: S, case_sensitive: bool = True) -> S: + return self.update(case_sensitive=case_sensitive) + + def update(self: S, **overrides: Any) -> S: + """Create a copy of this table scan with updated fields.""" + return type(self)(**{**self.__dict__, **overrides}) + + def to_pandas(self, **kwargs: Any) -> pd.DataFrame: + """Read a Pandas DataFrame eagerly from this Iceberg table scan. + + Returns: + pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table scan + """ + return self.to_arrow().to_pandas(**kwargs) + + def to_duckdb(self, table_name: str, connection: Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection: + """Shorthand for loading this table scan in DuckDB. + + Returns: + DuckDBPyConnection: In memory DuckDB connection with the Iceberg table scan. + """ + import duckdb + + con = connection or duckdb.connect(database=":memory:") + con.register(table_name, self.to_arrow()) + + return con + + def to_ray(self) -> ray.data.dataset.Dataset: + """Read a Ray Dataset eagerly from this Iceberg table scan. + + Returns: + ray.data.dataset.Dataset: Materialized Ray Dataset from the Iceberg table scan + """ + import ray + + return ray.data.from_arrow(self.to_arrow()) + + def to_polars(self) -> pl.DataFrame: + """Read a Polars DataFrame from this Iceberg table scan. + + Returns: + pl.DataFrame: Materialized Polars Dataframe from the Iceberg table scan + """ + import polars as pl + + result = pl.from_arrow(self.to_arrow()) + if isinstance(result, pl.Series): + result = result.to_frame() + + return result + + +class FileBasedScan(AbstractTableScan, ABC): + """A base class for table scans that plan FileScanTasks.""" + + @cached_property + def _manifest_group_planner(self) -> ManifestGroupPlanner: Review Comment: To me, the abstraction of something that handles planning such tasks from manifests makes sense and naturally reduces duplication with the new append scan type. I think other design decisions are possible like introducing a member for it or rethinking the abstraction. This felt fine to me (I like keeping this class thin) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org