grobgl commented on issue #5800:
URL: https://github.com/apache/iceberg/issues/5800#issuecomment-1553073273

   I'm keen to push this forward. @TomAugspurger's implementation works in 
single-threaded mode but fails in a distributed scenario due to current lack of 
pickle support (I raised a separate issue #7644).
   
   Extending Tom's approach, this is a solution which utilises Dask's 
`from_map` and `DataFramIOFunction` which allows us to pass projected columns 
to the Parquet reader:
   ```python
   class IcebergFunctionWrapper(DataFrameIOFunction):
       def __init__(
           self,
           fs: FileSystem,
           bound_row_filter: BooleanExpression,
           projected_schema: Schema,
           case_sensitive: bool,
       ):
           self._fs = fs
           self._bound_row_filter = bound_row_filter
           self._projected_schema = projected_schema
           self._case_sensitive = case_sensitive
           self._projected_field_ids = {
               id for id in projected_schema.field_ids
               if not isinstance(projected_schema.find_type(id), (MapType, 
ListType))
           }.union(extract_field_ids(bound_row_filter))
           super().__init__()
       
       @property
       def columns(self) -> List[str]:
           self._projected_schema.column_names
   
       @property
       def empty_table(self) -> pd.DataFrame:
           return 
schema_to_pyarrow(self._projected_schema).empty_table().to_pandas(date_as_object=False)
   
       def project_columns(self, columns: Sequence[str]) -> 
'IcebergFunctionWrapper':
           if list(columns) == self.columns:
               return self
   
           return IcebergFunctionWrapper(
               self._fs,
               self._bound_row_filter,
               self._projected_schema.select(*columns),
               self._case_sensitive,
           )
   
       def __call__(self, task: FileScanTask) -> pd.DataFrame:
           table = _file_to_table(
               self._fs,
               task,
               self._bound_row_filter,
               self._projected_schema,
               self._projected_field_ids,
               self._case_sensitive,
               0,  # no limit support yet
           )
   
           if table is None:
               return self.empty_table
   
           return table.to_pandas(date_as_object=False)
   
   
   def to_dask_dataframe(scan: DataScan) -> dd.DataFrame:
       tasks = scan.plan_files()
       table = scan.table
       row_filter = scan.row_filter
       projected_schema = scan.projection()
       case_sensitive = scan.case_sensitive
   
       scheme, _ = PyArrowFileIO.parse_location(table.location())
       if isinstance(table.io, PyArrowFileIO):
           fs = table.io.get_fs(scheme)
       else:
           try:
               from pyiceberg.io.fsspec import FsspecFileIO
   
               if isinstance(table.io, FsspecFileIO):
                   fs = PyFileSystem(FSSpecHandler(table.io.get_fs(scheme)))
               else:
                   raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, 
got: {table.io}")
           except ModuleNotFoundError as e:
               # When FsSpec is not installed
               raise ValueError(f"Expected PyArrowFileIO or FsspecFileIO, got: 
{table.io}") from e
   
       bound_row_filter = bind(table.schema(), row_filter, 
case_sensitive=case_sensitive)
   
       io_func = IcebergFunctionWrapper(fs, bound_row_filter, projected_schema, 
case_sensitive)
   
       return dd.from_map(
           io_func,
           tasks,
           meta=io_func.empty_table,
           enforce_metadata=False,
       )
   ```
   I'm also looking into adding `divisions` support and row-group-level 
parallelisation to this.
   
   Generally, should this be part of the Dask library instead of PyIceberg?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to