kevinjqliu commented on code in PR #1043:
URL: https://github.com/apache/iceberg-python/pull/1043#discussion_r1716128868


##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,138 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None
+
+        futures = [
+            executor.submit(
+                _project_table_from_scan_task,
+                task,
+            )
+            for task in tasks
+        ]
+        total_row_count = 0
+        # for consistent ordering, we need to maintain future order
+        futures_index = {f: i for i, f in enumerate(futures)}
+        completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
+        for future in concurrent.futures.as_completed(futures):
+            completed_futures.add(future)
+            if table_result := future.result():
+                total_row_count += len(table_result)
+            # stop early if limit is satisfied
+            if self._limit is not None and total_row_count >= self._limit:
+                break
+
+        # by now, we've either completed all tasks or satisfied the limit
+        if self._limit is not None:
+            _ = [f.cancel() for f in futures if not f.done()]
+
+        tables = [f.result() for f in completed_futures if f.result()]
+
+        if len(tables) < 1:
+            return pa.Table.from_batches([], 
schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
+
+        result = pa.concat_tables(tables, promote_options="permissive")
+
+        if self._limit is not None:
+            return result.slice(0, self._limit)
+
+        return result
+
+    def project_batches(self, tasks: Iterable[FileScanTask]) -> 
Iterator[pa.RecordBatch]:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        return self._project_batches_from_scan_tasks_and_deletes(tasks, 
deletes_per_file)
+
+    def _project_batches_from_scan_tasks_and_deletes(
+        self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, 
List[ChunkedArray]]
+    ) -> Iterator[pa.RecordBatch]:
+        limit = self._limit
+        for task in tasks:

Review Comment:
   i think we need to enforce `limit` across the tasks too, similar to #1042



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,138 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None
+
+        futures = [
+            executor.submit(
+                _project_table_from_scan_task,
+                task,
+            )
+            for task in tasks
+        ]
+        total_row_count = 0
+        # for consistent ordering, we need to maintain future order
+        futures_index = {f: i for i, f in enumerate(futures)}
+        completed_futures: SortedList[Future[pa.Table]] = 
SortedList(iterable=[], key=lambda f: futures_index[f])
+        for future in concurrent.futures.as_completed(futures):
+            completed_futures.add(future)
+            if table_result := future.result():
+                total_row_count += len(table_result)
+            # stop early if limit is satisfied
+            if self._limit is not None and total_row_count >= self._limit:
+                break
+
+        # by now, we've either completed all tasks or satisfied the limit
+        if self._limit is not None:
+            _ = [f.cancel() for f in futures if not f.done()]
+
+        tables = [f.result() for f in completed_futures if f.result()]
+
+        if len(tables) < 1:
+            return pa.Table.from_batches([], 
schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False))
+
+        result = pa.concat_tables(tables, promote_options="permissive")
+
+        if self._limit is not None:
+            return result.slice(0, self._limit)
+
+        return result
+
+    def project_batches(self, tasks: Iterable[FileScanTask]) -> 
Iterator[pa.RecordBatch]:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        return self._project_batches_from_scan_tasks_and_deletes(tasks, 
deletes_per_file)
+
+    def _project_batches_from_scan_tasks_and_deletes(
+        self, tasks: Iterable[FileScanTask], deletes_per_file: Dict[str, 
List[ChunkedArray]]
+    ) -> Iterator[pa.RecordBatch]:
+        limit = self._limit
+        for task in tasks:
+            batches = _task_to_record_batches(

Review Comment:
   I like that `_task_to_record_batches` is kept the same, as a generator. And 
this function is getting each batch at a time and handling the `limit` logic



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,138 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:

Review Comment:
   right now, `project_table` parallelizes reading and `project_batches` does 
not



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,138 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None

Review Comment:
   nit: is this allowed? or can we return `pa.Table.from_batches([])` ? 



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,138 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:
+        deletes_per_file = _read_all_delete_files(self._fs, tasks)
+        executor = ExecutorFactory.get_or_create()
+
+        def _project_table_from_scan_task(task: FileScanTask) -> pa.Table:
+            batches = 
list(self._project_batches_from_scan_tasks_and_deletes([task], 
deletes_per_file))
+            if len(batches) > 0:
+                return pa.Table.from_batches(batches)
+            else:
+                return None
+
+        futures = [
+            executor.submit(
+                _project_table_from_scan_task,
+                task,

Review Comment:
   this is where #1057 can come in. we pass the `limit` to the task scan level. 
   * if `limit` > `len(scan_task)`, return as normal
   * if `limit` < `len(scan_task)`, scan task will return fewer batches



##########
pyiceberg/io/pyarrow.py:
##########
@@ -1308,6 +1309,138 @@ def _read_all_delete_files(fs: FileSystem, tasks: 
Iterable[FileScanTask]) -> Dic
     return deletes_per_file
 
 
+def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem:
+    scheme, netloc, _ = _parse_location(file_path)
+    if isinstance(io, PyArrowFileIO):
+        return io.fs_by_scheme(scheme, netloc)
+    else:
+        try:
+            from pyiceberg.io.fsspec import FsspecFileIO
+
+            if isinstance(io, FsspecFileIO):
+                from pyarrow.fs import PyFileSystem
+
+                return PyFileSystem(FSSpecHandler(io.get_fs(scheme)))
+            else:
+                raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {io}")
+        except ModuleNotFoundError as e:
+            # When FsSpec is not installed
+            raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{io}") from e
+
+
+class PyArrowProjector:
+    _table_metadata: TableMetadata
+    _io: FileIO
+    _fs: FileSystem
+    _projected_schema: Schema
+    _bound_row_filter: BooleanExpression
+    _case_sensitive: bool
+    _limit: Optional[int]
+
+    def __init__(
+        self,
+        table_metadata: TableMetadata,
+        io: FileIO,
+        projected_schema: Schema,
+        row_filter: BooleanExpression,
+        case_sensitive: bool = True,
+        limit: Optional[int] = None,
+    ) -> None:
+        self._table_metadata = table_metadata
+        self._io = io
+        self._fs = _fs_from_file_path(table_metadata.location, io)  # TODO: 
use different FileSystem per file
+        self._projected_schema = projected_schema
+        self._bound_row_filter = bind(table_metadata.schema(), row_filter, 
case_sensitive=case_sensitive)
+        self._case_sensitive = case_sensitive
+        self._limit = limit
+
+    @property
+    def _use_large_types(self) -> bool:
+        return property_as_bool(self._io.properties, 
PYARROW_USE_LARGE_TYPES_ON_READ, True)
+
+    @property
+    def _projected_field_ids(self) -> Set[int]:
+        return {
+            id
+            for id in self._projected_schema.field_ids
+            if not isinstance(self._projected_schema.find_type(id), (MapType, 
ListType))
+        }.union(extract_field_ids(self._bound_row_filter))
+
+    def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:

Review Comment:
   from an API standpoint, whats the difference between `project_table` vs 
`project_batches`? Is one suppose to be more memory efficient? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to