grobgl commented on issue #5800:
URL: https://github.com/apache/iceberg/issues/5800#issuecomment-1553073273
I'm keen to push this forward. @TomAugspurger's implementation works in
single-threaded mode but fails in a distributed scenario due to current lack of
pickle support (I raised a separate issue #7644).
Extending Tom's approach, this is a solution which utilises Dask's
`from_map` and `DataFramIOFunction` which allows us to pass projected columns
to the Parquet reader:
```python
class IcebergFunctionWrapper(DataFrameIOFunction):
def __init__(
self,
fs: FileSystem,
bound_row_filter: BooleanExpression,
projected_schema: Schema,
case_sensitive: bool,
):
self._fs = fs
self._bound_row_filter = bound_row_filter
self._projected_schema = projected_schema
self._case_sensitive = case_sensitive
self._projected_field_ids = {
id for id in projected_schema.field_ids
if not isinstance(projected_schema.find_type(id), (MapType,
ListType))
}.union(extract_field_ids(bound_row_filter))
super().__init__()
@property
def columns(self) -> List[str]:
self._projected_schema.column_names
@property
def empty_table(self) -> pd.DataFrame:
return
schema_to_pyarrow(self._projected_schema).empty_table().to_pandas(date_as_object=False)
def project_columns(self, columns: Sequence[str]) ->
'IcebergFunctionWrapper':
if list(columns) == self.columns:
return self
return IcebergFunctionWrapper(
self._fs,
self._bound_row_filter,
self._projected_schema.select(*columns),
self._case_sensitive,
)
def __call__(self, task: FileScanTask) -> pd.DataFrame:
table = _file_to_table(
self._fs,
task,
self._bound_row_filter,
self._projected_schema,
self._projected_field_ids,
self._case_sensitive,
0, # no limit support yet
)
if table is None:
return self.empty_table
return table.to_pandas(date_as_object=False)
def to_dask_dataframe(scan: DataScan) -> dd.DataFrame:
tasks = scan.plan_files()
table = scan.table
row_filter = scan.row_filter
projected_schema = scan.projection()
case_sensitive = scan.case_sensitive
scheme, _ = PyArrowFileIO.parse_location(table.location())
if isinstance(table.io, PyArrowFileIO):
fs = table.io.get_fs(scheme)
else:
try:
from pyiceberg.io.fsspec import FsspecFileIO
if isinstance(table.io, FsspecFileIO):
fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
else:
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO,
got: {table.io}")
except ModuleNotFoundError as e:
# When FsSpec is not installed
raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got:
{table.io}") from e
bound_row_filter = bind(table.schema(), row_filter,
case_sensitive=case_sensitive)
io_func = IcebergFunctionWrapper(fs, bound_row_filter, projected_schema,
case_sensitive)
return dd.from_map(
io_func,
tasks,
meta=io_func.empty_table,
enforce_metadata=False,
)
```
I'm also looking into adding `divisions` support and row-group-level
parallelisation to this.
Generally, should this be part of the Dask library instead of PyIceberg?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]