sungwy commented on code in PR #1043: URL: https://github.com/apache/iceberg-python/pull/1043#discussion_r1717833002
########## pyiceberg/io/pyarrow.py: ########## @@ -1304,6 +1305,195 @@ def _read_all_delete_files(fs: FileSystem, tasks: Iterable[FileScanTask]) -> Dic return deletes_per_file +def _fs_from_file_path(file_path: str, io: FileIO) -> FileSystem: + scheme, netloc, _ = _parse_location(file_path) + if isinstance(io, PyArrowFileIO): + return io.fs_by_scheme(scheme, netloc) + else: + try: + from pyiceberg.io.fsspec import FsspecFileIO + + if isinstance(io, FsspecFileIO): + from pyarrow.fs import PyFileSystem + + return PyFileSystem(FSSpecHandler(io.get_fs(scheme))) + else: + raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") + except ModuleNotFoundError as e: + # When FsSpec is not installed + raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: {io}") from e + + +class PyArrowProjector: + _table_metadata: TableMetadata + _io: FileIO + _fs: FileSystem + _projected_schema: Schema + _bound_row_filter: BooleanExpression + _case_sensitive: bool + _limit: Optional[int] + """Projects an Iceberg Table to a PyArrow construct. + + Attributes: + _table_metadata: Current table metadata of the Iceberg table + _io: PyIceberg FileIO implementation from which to fetch the io properties + _fs: PyArrow FileSystem to use to read the files + _projected_schema: Iceberg Schema to project onto the data files + _bound_row_filter: Schema bound row expression to filter the data with + _case_sensitive: Case sensitivity when looking up column names + _limit: Limit the number of records. + """ + + def __init__( + self, + table_metadata: TableMetadata, + io: FileIO, + projected_schema: Schema, + row_filter: BooleanExpression, + case_sensitive: bool = True, + limit: Optional[int] = None, + ) -> None: + self._table_metadata = table_metadata + self._io = io + self._fs = _fs_from_file_path(table_metadata.location, io) # TODO: use different FileSystem per file + self._projected_schema = projected_schema + self._bound_row_filter = bind(table_metadata.schema(), row_filter, case_sensitive=case_sensitive) + self._case_sensitive = case_sensitive + self._limit = limit + + @property + def _use_large_types(self) -> bool: + """Whether to represent data as large arrow types. + + Defaults to True. + """ + return property_as_bool(self._io.properties, PYARROW_USE_LARGE_TYPES_ON_READ, True) + + @property + def _projected_field_ids(self) -> Set[int]: + """Set of field IDs that should be projected from the data files.""" + return { + id + for id in self._projected_schema.field_ids + if not isinstance(self._projected_schema.find_type(id), (MapType, ListType)) + }.union(extract_field_ids(self._bound_row_filter)) + + def project_table(self, tasks: Iterable[FileScanTask]) -> pa.Table: + """Project the Iceberg table to a pa.Table. + + Returns a pa.Table with data from the Iceberg table by resolving the + right columns that match the current table schema. Only data that + matches the provided row_filter expression is returned. + + Args: + tasks: FileScanTasks representing the data files and delete files to read from. + + Returns: + A PyArrow table. Result is capped at the limit, if specified. + + Raises: + ResolveError: When a required field cannot be found in the file + ValueError: When a field type in the file cannot be projected to the schema type + """ + deletes_per_file = _read_all_delete_files(self._fs, tasks) + executor = ExecutorFactory.get_or_create() + + def _project_table_from_scan_task(task: FileScanTask) -> pa.Table: + batches = list(self._project_batches_from_scan_tasks_and_deletes([task], deletes_per_file)) + if len(batches) > 0: + return pa.Table.from_batches(batches) + else: + return None + + futures = [ + executor.submit( + _project_table_from_scan_task, + task, + ) + for task in tasks + ] + total_row_count = 0 + # for consistent ordering, we need to maintain future order + futures_index = {f: i for i, f in enumerate(futures)} + completed_futures: SortedList[Future[pa.Table]] = SortedList(iterable=[], key=lambda f: futures_index[f]) + for future in concurrent.futures.as_completed(futures): + completed_futures.add(future) + if table_result := future.result(): + total_row_count += len(table_result) + # stop early if limit is satisfied + if self._limit is not None and total_row_count >= self._limit: + break + + # by now, we've either completed all tasks or satisfied the limit + if self._limit is not None: + _ = [f.cancel() for f in futures if not f.done()] + + tables = [f.result() for f in completed_futures if f.result()] + + if len(tables) < 1: + return pa.Table.from_batches([], schema=schema_to_pyarrow(self._projected_schema, include_field_ids=False)) + + result = pa.concat_tables(tables, promote_options="permissive") Review Comment: Thank you for chiming in with your arrow expertise @corleyma . Always appreciative of your input here. That's consistent with my understanding as as well. Unless we have to cast the data to different types, the operations should be zero copy. For the record, the refactoring should not introduce any behavior change from the current behavior, and here's the rationale for why we are doing the things currently the way they are: - ThreadPoolExecutor converts record batches to tables, and then those tables are concatenated into a single table: This is to parallelize the work of eagerly materializing the data in the parquet files. If we return iterators of record batches and then create a pa.Table from the lazy iterator of record batches, we don't benefit from the parallelization. - when using pa.Table, we can use the 'permissive' mode on concatenation. This means types need to be upcast only if there are discrepancies between the tables (like large vs small types). When creating a pa.Table from RecordBatches, the schema being read from different files need to be the same, which can be a challenge with small vs large types: https://arrow.apache.org/docs/python/generated/pyarrow.Table.html. Given above reasons, we are materializing recordbatches as pa.Table in individual threads that corresponds to the work of scanning individual data files, and then concatenating the tables into one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org