smaheshwar-pltr commented on code in PR #2031:
URL: https://github.com/apache/iceberg-python/pull/2031#discussion_r2102778115


##########
pyiceberg/table/__init__.py:
##########
@@ -1834,116 +2186,79 @@ def plan_files(self) -> Iterable[FileScanTask]:
             for data_entry in data_entries
         ]
 
-    def to_arrow(self) -> pa.Table:
-        """Read an Arrow table eagerly from this DataScan.
-
-        All rows will be loaded into memory at once.
-
-        Returns:
-            pa.Table: Materialized Arrow Table from the Iceberg table's 
DataScan
-        """
-        from pyiceberg.io.pyarrow import ArrowScan
-
-        return ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_table(self.plan_files())
-
-    def to_arrow_batch_reader(self) -> pa.RecordBatchReader:
-        """Return an Arrow RecordBatchReader from this DataScan.
-
-        For large results, using a RecordBatchReader requires less memory than
-        loading an Arrow Table for the same DataScan, because a RecordBatch
-        is read one at a time.
-
-        Returns:
-            pa.RecordBatchReader: Arrow RecordBatchReader from the Iceberg 
table's DataScan
-                which can be used to read a stream of record batches one by 
one.
-        """
-        import pyarrow as pa
-
-        from pyiceberg.io.pyarrow import ArrowScan, schema_to_pyarrow
-
-        target_schema = schema_to_pyarrow(self.projection())
-        batches = ArrowScan(
-            self.table_metadata, self.io, self.projection(), self.row_filter, 
self.case_sensitive, self.limit
-        ).to_record_batches(self.plan_files())
+    @cached_property
+    def partition_filters(self) -> KeyDefaultDict[int, BooleanExpression]:
+        return KeyDefaultDict(self._build_partition_projection)
 
-        return pa.RecordBatchReader.from_batches(
-            target_schema,
-            batches,
-        ).cast(target_schema)
+    def _build_partition_projection(self, spec_id: int) -> BooleanExpression:
+        project = inclusive_projection(self.table_metadata.schema(), 
self.table_metadata.specs()[spec_id], self.case_sensitive)
+        return project(self.row_filter)
 
-    def to_pandas(self, **kwargs: Any) -> pd.DataFrame:
-        """Read a Pandas DataFrame eagerly from this Iceberg table.
+    def _build_manifest_evaluator(self, spec_id: int) -> 
Callable[[ManifestFile], bool]:
+        spec = self.table_metadata.specs()[spec_id]
+        return manifest_evaluator(spec, self.table_metadata.schema(), 
self.partition_filters[spec_id], self.case_sensitive)
 
-        Returns:
-            pd.DataFrame: Materialized Pandas Dataframe from the Iceberg table
-        """
-        return self.to_arrow().to_pandas(**kwargs)
+    def _build_partition_evaluator(self, spec_id: int) -> Callable[[DataFile], 
bool]:
+        spec = self.table_metadata.specs()[spec_id]
+        partition_type = spec.partition_type(self.table_metadata.schema())
+        partition_schema = Schema(*partition_type.fields)
+        partition_expr = self.partition_filters[spec_id]
 
-    def to_duckdb(self, table_name: str, connection: 
Optional[DuckDBPyConnection] = None) -> DuckDBPyConnection:
-        """Shorthand for loading the Iceberg Table in DuckDB.
+        # The lambda created here is run in multiple threads.
+        # So we avoid creating _EvaluatorExpression methods bound to a single
+        # shared instance across multiple threads.
+        return lambda data_file: expression_evaluator(partition_schema, 
partition_expr, self.case_sensitive)(data_file.partition)
 
-        Returns:
-            DuckDBPyConnection: In memory DuckDB connection with the Iceberg 
table.
-        """
-        import duckdb
+    def _build_metrics_evaluator(self) -> Callable[[DataFile], bool]:
+        schema = self.table_metadata.schema()
+        include_empty_files = 
strtobool(self.options.get("include_empty_files", "false"))
 
-        con = connection or duckdb.connect(database=":memory:")
-        con.register(table_name, self.to_arrow())
+        # The lambda created here is run in multiple threads.
+        # So we avoid creating _InclusiveMetricsEvaluator methods bound to a 
single
+        # shared instance across multiple threads.
+        return lambda data_file: _InclusiveMetricsEvaluator(
+            schema,
+            self.row_filter,
+            self.case_sensitive,
+            include_empty_files,
+        ).eval(data_file)
 
-        return con
+    def _build_residual_evaluator(self, spec_id: int) -> Callable[[DataFile], 
ResidualEvaluator]:
+        spec = self.table_metadata.specs()[spec_id]
 
-    def to_ray(self) -> ray.data.dataset.Dataset:
-        """Read a Ray Dataset eagerly from this Iceberg table.
+        # The lambda created here is run in multiple threads.
+        # So we avoid creating _EvaluatorExpression methods bound to a single
+        # shared instance across multiple threads.
+        # return lambda data_file: (partition_schema, partition_expr, 
self.case_sensitive)(data_file.partition)
+        from pyiceberg.expressions.visitors import residual_evaluator_of
 
-        Returns:
-            ray.data.dataset.Dataset: Materialized Ray Dataset from the 
Iceberg table
-        """
-        import ray
+        # assert self.row_filter == False
+        return lambda datafile: (
+            residual_evaluator_of(
+                spec=spec,
+                expr=self.row_filter,
+                case_sensitive=self.case_sensitive,
+                schema=self.table_metadata.schema(),
+            )
+        )
 
-        return ray.data.from_arrow(self.to_arrow())
+    # TODO: Document that this method was was made static
+    @staticmethod

Review Comment:
   I made this method static (it wasn't before - the one on `DataScan`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to