kevinjqliu commented on code in PR #1043:
URL: https://github.com/apache/iceberg-python/pull/1043#discussion_r1716215131


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,192 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+    """Projects an Iceberg Table to a PyArrow construct.
+
+    Attributes:
+        _table_metadata: Current table metadata of the Iceberg table
+        _io: PyIceberg FileIO implementation from which to fetch the io 
properties
+        _fs: PyArrow FileSystem to use to read the files
+        _projected_schema: Iceberg Schema to project onto the data files
+        _bound_row_filter: Schema bound row expression to filter the data with
+        _case_sensitive: Case sensitivity when looking up column names
+        _limit: Limit the number of records.
+    """
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        """Whether to represent data as large arrow types.
+
+        Defaults to True.
+        """
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        """Set of field IDs that should be projected from the data files."""
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        """Project the Iceberg table to a pa.Table.
+
+        Returns a pa.Table with data from the Iceberg table by resolving the
+        right columns that match the current table schema. Only data that
+        matches the provided row_filter expression is returned.
+
+        Args:
+            tasks: FileScanTasks representing the data files and delete files 
to read from.
+
+        Returns:
+            A PyArrow table. Result is capped at the limit, if specified.
+
+        Raises:
+            ResolveError: When a required field cannot be found in the file
+            ValueError: When a field type in the file cannot be projected to 
the schema type
+        """
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None
+
+        futures = [
+            executor.submit(
+                _project_table_from_scan_task,
+                task,
+            )
+            for task in tasks
+        ]
+        total_row_count = 0
+        # for consistent ordering, we need to maintain future order
+        futures_index = {f: i for i, f in enumerate(futures)}
+        completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
+        for future in concurrent.futures.as_completed(futures):
+            completed_futures.add(future)
+            if table_result := future.result():
+                total_row_count += len(table_result)
+            # stop early if limit is satisfied
+            if self._limit is not None and total_row_count >= self._limit:
+                break
+
+        # by now, we've either completed all tasks or satisfied the limit
+        if self._limit is not None:
+            _ = [f.cancel() for f in futures if not f.done()]
+
+        tables = [f.result() for f in completed_futures if f.result()]
+
+        if len(tables) < 1:
+            return pa.Table.from_batches([], 
schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
+
+        result = pa.concat_tables(tables, promote_options="permissive")
+
+        if self._limit is not None:
+            return result.slice(0, self._limit)
+
+        return result
+
+    def project_batches(self, tasks: Iterable[FileScanTask]) -> 
Iterator[pa.RecordBatch]:
+        """Project the Iceberg table to an Iterator[pa.RecordBatch].
+
+        Returns an Iterator of pa.RecordBatch with data from the Iceberg table
+        by resolving the right columns that match the current table schema.
+        Only data that matches the provided row_filter expression is returned.
+
+        Args:
+            tasks: FileScanTasks representing the data files and delete files 
to read from.
+
+        Returns:
+            An Iterator of PyArrow RecordBatches. Result is capped at the 
limit,
+            if specified.
+
+        Raises:
+            ResolveError: When a required field cannot be found in the file
+            ValueError: When a field type in the file cannot be projected to 
the schema type
+        """
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        return self._project_batches_from_scan_tasks_and_deletes(tasks, 
deletes_per_file)
+
+    def _project_batches_from_scan_tasks_and_deletes(
+        self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, 
List[ChunkedArray]]
+    ) -> Iterator[pa.RecordBatch]:
+        limit = self._limit
+        for task in tasks:
+            batches = _task_to_record_batches(
+                self._fs,
+                task,
+                self._bound_row_filter,
+                self._projected_schema,
+                self._projected_field_ids,
+                deletes_per_file.get(task.file.file_path),
+                self._case_sensitive,
+                self._table_metadata.name_mapping(),
+                self._use_large_types,
+            )
+            for batch in batches:
+                if limit is not None:
+                    if len(batch) >= limit:
+                        yield batch.slice(0, limit)
+                        break
+                    limit -= len(batch)
+                yield batch
+
+
+@deprecated(
+    deprecated_in="0.8.0",
+    removed_in="0.9.0",
+    help_message="project_table is deprecated. Use PyArrowProjector instead.",

Review Comment:
   nit:
   ```suggestion
       help_message="project_table is deprecated. Use 
PyArrowProjector.project_table instead.",
   ```



##########
pyiceberg/io/__init__.py:
##########
@@ -354,3 +356,14 @@ def load_file_io(properties: Properties = EMPTY_DICT, 
location: Optional[str] =
         raise ModuleNotFoundError(
             'Could not load a FileIO, please consider installing one: pip3 
install "pyiceberg[pyarrow]", for more options refer to the docs.'
         ) from e
+
+
+def _parse_location(location: str) -> Tuple[str, str, str]:

Review Comment:
   this is copied over from 
[PyArrowFileIO.parse_location](https://github.com/apache/iceberg-python/blob/3821833c329d217cc4d2514234267bcb2ca1b700/pyiceberg/io/pyarrow.py#L336)



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,192 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+    """Projects an Iceberg Table to a PyArrow construct.

Review Comment:
   is "project" the right word to use here? i think "project" sounds like 
schema projection. this is doing schema projection, row filter, and limits



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,192 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e

Review Comment:
   copied over from 
[project_table](https://github.com/apache/iceberg-python/blob/3821833c329d217cc4d2514234267bcb2ca1b700/pyiceberg/io/pyarrow.py#L1330-L1345)



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,138 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None
+
+        futures = [
+            executor.submit(
+                _project_table_from_scan_task,
+                task,
+            )
+            for task in tasks
+        ]
+        total_row_count = 0
+        # for consistent ordering, we need to maintain future order
+        futures_index = {f: i for i, f in enumerate(futures)}
+        completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
+        for future in concurrent.futures.as_completed(futures):
+            completed_futures.add(future)
+            if table_result := future.result():
+                total_row_count += len(table_result)
+            # stop early if limit is satisfied
+            if self._limit is not None and total_row_count >= self._limit:
+                break
+
+        # by now, we've either completed all tasks or satisfied the limit
+        if self._limit is not None:
+            _ = [f.cancel() for f in futures if not f.done()]
+
+        tables = [f.result() for f in completed_futures if f.result()]
+
+        if len(tables) < 1:
+            return pa.Table.from_batches([], 
schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
+
+        result = pa.concat_tables(tables, promote_options="permissive")
+
+        if self._limit is not None:
+            return result.slice(0, self._limit)
+
+        return result
+
+    def project_batches(self, tasks: Iterable[FileScanTask]) -> 
Iterator[pa.RecordBatch]:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        return self._project_batches_from_scan_tasks_and_deletes(tasks, 
deletes_per_file)
+
+    def _project_batches_from_scan_tasks_and_deletes(
+        self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, 
List[ChunkedArray]]
+    ) -> Iterator[pa.RecordBatch]:
+        limit = self._limit
+        for task in tasks:

Review Comment:
   actually looks like the `limit` is decremented, so this is fine. and we now 
have tests to check if `limit` is preserved across multiple files



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,192 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+    """Projects an Iceberg Table to a PyArrow construct.
+
+    Attributes:
+        _table_metadata: Current table metadata of the Iceberg table
+        _io: PyIceberg FileIO implementation from which to fetch the io 
properties
+        _fs: PyArrow FileSystem to use to read the files
+        _projected_schema: Iceberg Schema to project onto the data files
+        _bound_row_filter: Schema bound row expression to filter the data with
+        _case_sensitive: Case sensitivity when looking up column names
+        _limit: Limit the number of records.
+    """
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        """Whether to represent data as large arrow types.
+
+        Defaults to True.
+        """
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        """Set of field IDs that should be projected from the data files."""
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        """Project the Iceberg table to a pa.Table.
+
+        Returns a pa.Table with data from the Iceberg table by resolving the
+        right columns that match the current table schema. Only data that
+        matches the provided row_filter expression is returned.
+
+        Args:
+            tasks: FileScanTasks representing the data files and delete files 
to read from.
+
+        Returns:
+            A PyArrow table. Result is capped at the limit, if specified.

Review Comment:
   nit: total number of rows will be capped if specified



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,192 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+    """Projects an Iceberg Table to a PyArrow construct.
+
+    Attributes:
+        _table_metadata: Current table metadata of the Iceberg table
+        _io: PyIceberg FileIO implementation from which to fetch the io 
properties
+        _fs: PyArrow FileSystem to use to read the files
+        _projected_schema: Iceberg Schema to project onto the data files
+        _bound_row_filter: Schema bound row expression to filter the data with
+        _case_sensitive: Case sensitivity when looking up column names
+        _limit: Limit the number of records.
+    """
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        """Whether to represent data as large arrow types.
+
+        Defaults to True.
+        """
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        """Set of field IDs that should be projected from the data files."""
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        """Project the Iceberg table to a pa.Table.
+
+        Returns a pa.Table with data from the Iceberg table by resolving the
+        right columns that match the current table schema. Only data that
+        matches the provided row_filter expression is returned.
+
+        Args:
+            tasks: FileScanTasks representing the data files and delete files 
to read from.
+
+        Returns:
+            A PyArrow table. Result is capped at the limit, if specified.
+
+        Raises:
+            ResolveError: When a required field cannot be found in the file
+            ValueError: When a field type in the file cannot be projected to 
the schema type
+        """
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None
+
+        futures = [
+            executor.submit(
+                _project_table_from_scan_task,
+                task,
+            )
+            for task in tasks
+        ]
+        total_row_count = 0
+        # for consistent ordering, we need to maintain future order
+        futures_index = {f: i for i, f in enumerate(futures)}
+        completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
+        for future in concurrent.futures.as_completed(futures):
+            completed_futures.add(future)
+            if table_result := future.result():
+                total_row_count += len(table_result)
+            # stop early if limit is satisfied
+            if self._limit is not None and total_row_count >= self._limit:
+                break
+
+        # by now, we've either completed all tasks or satisfied the limit
+        if self._limit is not None:
+            _ = [f.cancel() for f in futures if not f.done()]
+
+        tables = [f.result() for f in completed_futures if f.result()]
+
+        if len(tables) < 1:
+            return pa.Table.from_batches([], 
schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
+
+        result = pa.concat_tables(tables, promote_options="permissive")
+
+        if self._limit is not None:
+            return result.slice(0, self._limit)
+
+        return result
+
+    def project_batches(self, tasks: Iterable[FileScanTask]) -> 
Iterator[pa.RecordBatch]:
+        """Project the Iceberg table to an Iterator[pa.RecordBatch].
+
+        Returns an Iterator of pa.RecordBatch with data from the Iceberg table
+        by resolving the right columns that match the current table schema.
+        Only data that matches the provided row_filter expression is returned.
+
+        Args:
+            tasks: FileScanTasks representing the data files and delete files 
to read from.
+
+        Returns:
+            An Iterator of PyArrow RecordBatches. Result is capped at the 
limit,
+            if specified.

Review Comment:
   nit: total number of rows will be capped if specified (same as above) 



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,192 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+    """Projects an Iceberg Table to a PyArrow construct.
+
+    Attributes:
+        _table_metadata: Current table metadata of the Iceberg table
+        _io: PyIceberg FileIO implementation from which to fetch the io 
properties
+        _fs: PyArrow FileSystem to use to read the files
+        _projected_schema: Iceberg Schema to project onto the data files
+        _bound_row_filter: Schema bound row expression to filter the data with
+        _case_sensitive: Case sensitivity when looking up column names
+        _limit: Limit the number of records.
+    """
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file

Review Comment:
   https://github.com/apache/iceberg-python/issues/1041



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1402,6 +1589,11 @@ def project_table(
     return result
 
 
+@deprecated(
+    deprecated_in="0.8.0",
+    removed_in="0.9.0",
+    help_message="project_table is deprecated. Use PyArrowProjector instead.",

Review Comment:
   nit:
   ```suggestion
       help_message="project_table is deprecated. Use 
PyArrowProjector.project_batches instead.",
   ```



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,192 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+    """Projects an Iceberg Table to a PyArrow construct.
+
+    Attributes:
+        _table_metadata: Current table metadata of the Iceberg table
+        _io: PyIceberg FileIO implementation from which to fetch the io 
properties
+        _fs: PyArrow FileSystem to use to read the files
+        _projected_schema: Iceberg Schema to project onto the data files
+        _bound_row_filter: Schema bound row expression to filter the data with
+        _case_sensitive: Case sensitivity when looking up column names
+        _limit: Limit the number of records.
+    """
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        """Whether to represent data as large arrow types.
+
+        Defaults to True.
+        """
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        """Set of field IDs that should be projected from the data files."""
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        """Project the Iceberg table to a pa.Table.
+
+        Returns a pa.Table with data from the Iceberg table by resolving the
+        right columns that match the current table schema. Only data that
+        matches the provided row_filter expression is returned.
+
+        Args:
+            tasks: FileScanTasks representing the data files and delete files 
to read from.
+
+        Returns:
+            A PyArrow table. Result is capped at the limit, if specified.
+
+        Raises:
+            ResolveError: When a required field cannot be found in the file
+            ValueError: When a field type in the file cannot be projected to 
the schema type
+        """
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None
+
+        futures = [
+            executor.submit(
+                _project_table_from_scan_task,
+                task,
+            )
+            for task in tasks
+        ]
+        total_row_count = 0
+        # for consistent ordering, we need to maintain future order
+        futures_index = {f: i for i, f in enumerate(futures)}
+        completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
+        for future in concurrent.futures.as_completed(futures):
+            completed_futures.add(future)
+            if table_result := future.result():
+                total_row_count += len(table_result)
+            # stop early if limit is satisfied
+            if self._limit is not None and total_row_count >= self._limit:
+                break
+
+        # by now, we've either completed all tasks or satisfied the limit
+        if self._limit is not None:
+            _ = [f.cancel() for f in futures if not f.done()]
+
+        tables = [f.result() for f in completed_futures if f.result()]
+
+        if len(tables) < 1:
+            return pa.Table.from_batches([], 
schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
+
+        result = pa.concat_tables(tables, promote_options="permissive")
+
+        if self._limit is not None:
+            return result.slice(0, self._limit)
+
+        return result
+
+    def project_batches(self, tasks: Iterable[FileScanTask]) -> 
Iterator[pa.RecordBatch]:
+        """Project the Iceberg table to an Iterator[pa.RecordBatch].
+
+        Returns an Iterator of pa.RecordBatch with data from the Iceberg table
+        by resolving the right columns that match the current table schema.
+        Only data that matches the provided row_filter expression is returned.
+

Review Comment:
   nit: add a snippet about why use this instead of `project_table`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to