Fokko commented on code in PR #6997:
URL: https://github.com/apache/iceberg/pull/6997#discussion_r1177504705
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -507,11 +709,8 @@ def _file_to_table(
schema_raw = None
if metadata := physical_schema.metadata:
schema_raw = metadata.get(ICEBERG_SCHEMA)
- if schema_raw is None:
- raise ValueError(
- "Iceberg schema is not embedded into the Parquet file, see
https://github.com/apache/iceberg/issues/6505"
- )
- file_schema = Schema.parse_raw(schema_raw)
+ # TODO: if field_ids are not present, Name Mapping should be
implemented to look them up in the table schema
Review Comment:
Should we create an issue for this, and still raise an exception?
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -486,6 +499,195 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ return visit_pyarrow(schema, visitor)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType | pa.Schema, visitor:
PyArrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(pa.DataType): An instance of a Schema or an IcebergType
+ visitor (PyArrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.Schema)
+def _(obj: pa.Schema, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ struct_results: List[Optional[T]] = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.schema(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ struct_results: List[Optional[T]] = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_field(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_field(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ visitor.before_field(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_field(obj.key_field)
+ visitor.before_field(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_field(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyArrowSchemaVisitor[T]) -> Optional[T]:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got: {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyArrowSchemaVisitor(Generic[T], ABC):
+ def before_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a field."""
+
+ def after_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a field."""
+
+ @abstractmethod
+ def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) ->
Optional[T]:
+ """visit a schema"""
+
+ @abstractmethod
+ def struct(self, struct: pa.StructType, field_results: List[Optional[T]])
-> Optional[T]:
+ """visit a struct"""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: Optional[T]) ->
Optional[T]:
+ """visit a list"""
+
+ @abstractmethod
+ def map(self, map_type: pa.MapType, key_result: Optional[T], value_result:
Optional[T]) -> Optional[T]:
+ """visit a map"""
+
+ @abstractmethod
+ def primitive(self, primitive: pa.DataType) -> Optional[T]:
+ """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> Optional[int]:
+ for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+ if field_id_str := field.metadata.get(pyarrow_field_id_key):
+ return int(field_id_str.decode())
+ return None
+
+
+def _get_field_doc(field: pa.Field) -> Optional[str]:
+ for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+ if doc_str := field.metadata.get(pyarrow_doc_key):
+ return doc_str.decode()
+ return None
+
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+ def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
Review Comment:
I noticed that there is quite a bit of duplication in the schema and struct,
how do you feel about:
```python
def _convert_fields(self, arrow_fields: Iterable[pa.Field],
field_results: List[Optional[IcebergType]]) -> List[NestedField]:
fields = []
for i, field in enumerate(arrow_fields):
field_id = _get_field_id(field)
field_doc = _get_field_doc(field)
field_type = field_results[i]
if field_type is not None and field_id is not None:
fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable, doc=field_doc))
return fields
def schema(self, schema: pa.Schema, field_results:
List[Optional[IcebergType]]) -> Schema:
return Schema(*self._convert_fields(schema, field_results))
def struct(self, struct: pa.StructType, field_results:
List[Optional[IcebergType]]) -> IcebergType:
return StructType(*self._convert_fields(struct, field_results))
```
Ran the type checker and tests, and this works 👍🏻
##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) ->
pc.Expression:
return boolean_expression_visit(expr, _ConvertToArrowExpression())
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+ visitor = _ConvertToIceberg()
+ struct_results = []
+ for i in range(len(schema.names)):
+ field = schema.field(i)
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+ return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ """A generic function for applying a pyarrow schema visitor to any point
within a schema
+
+ The function traverses the schema in post-order fashion
+
+ Args:
+ obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+ visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of
the generic PyarrowSchemaVisitor base class
+
+ Raises:
+ NotImplementedError: If attempting to visit an unrecognized object type
+ """
+ raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ struct_results = []
+ for field in obj:
+ visitor.before_field(field)
+ struct_result = visit_pyarrow(field.type, visitor)
+ visitor.after_field(field)
+ struct_results.append(struct_result)
+
+ return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_list_element(obj.value_field)
+ list_result = visit_pyarrow(obj.value_field.type, visitor)
+ visitor.after_list_element(obj.value_field)
+ return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ visitor.before_map_key(obj.key_field)
+ key_result = visit_pyarrow(obj.key_field.type, visitor)
+ visitor.after_map_key(obj.key_field)
+ visitor.before_map_value(obj.item_field)
+ value_result = visit_pyarrow(obj.item_field.type, visitor)
+ visitor.after_map_value(obj.item_field)
+ return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+ if pa.types.is_nested(obj):
+ raise TypeError(f"Expected primitive type, got {type(obj)}")
+ return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+ def before_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a field."""
+
+ def after_field(self, field: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a field."""
+
+ def before_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a list element."""
+
+ def after_list_element(self, element: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a list element."""
+
+ def before_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map key."""
+
+ def after_map_key(self, key: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map key."""
+
+ def before_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately before
visiting a map value."""
+
+ def after_map_value(self, value: pa.Field) -> None:
+ """Override this method to perform an action immediately after
visiting a map value."""
+
+ @abstractmethod
+ def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+ """visit a schema"""
+
+ @abstractmethod
+ def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+ """visit a struct"""
+
+ @abstractmethod
+ def list(self, list_type: pa.ListType, element_result: T) -> T:
+ """visit a list"""
+
+ @abstractmethod
+ def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+ """visit a map"""
+
+ @abstractmethod
+ def primitive(self, primitive: pa.DataType) -> T:
+ """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+ if field.metadata is not None:
+ field_metadata = {k.decode(): v.decode() for k, v in
field.metadata.items()}
+ if field_id := field_metadata.get("PARQUET:field_id"):
+ return int(field_id)
+ raise ValueError(f"Field {field.name} does not have a field_id")
+
+
+class _ConvertToIceberg(PyarrowSchemaVisitor[IcebergType], ABC):
+ def schema(self, schema: pa.Schema, field_results: List[IcebergType]) ->
Schema:
+ fields = []
+ for i in range(len(schema.names)):
+ field = schema.field(i)
+ field_id = _get_field_id(field)
+ field_type = field_results[i]
+ if field_type is not None:
+ fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable))
+ return Schema(*fields)
+
+ def struct(self, struct: pa.StructType, field_results: List[IcebergType])
-> IcebergType:
+ fields = []
+ for i in range(struct.num_fields):
+ field = struct[i]
+ field_id = _get_field_id(field)
+ # may need to check doc strings
+ field_type = field_results[i]
+ if field_type is not None:
+ fields.append(NestedField(field_id, field.name, field_type,
required=not field.nullable))
+ return StructType(*fields)
+
+ def list(self, list_type: pa.ListType, element_result: IcebergType) ->
IcebergType:
+ element_field = list_type.value_field
+ element_id = _get_field_id(element_field)
+ if element_result is not None:
+ return ListType(element_id, element_result, element_required=not
element_field.nullable)
+ raise ValueError(f"List type must have element field: {list_type}")
+
+ def map(self, map_type: pa.MapType, key_result: IcebergType, value_result:
IcebergType) -> IcebergType:
+ key_field = map_type.key_field
+ key_id = _get_field_id(key_field)
+ value_field = map_type.item_field
+ value_id = _get_field_id(value_field)
+ if key_result is not None and value_result is not None:
+ return MapType(key_id, key_result, value_id, value_result,
value_required=not value_field.nullable)
+ raise ValueError(f"Map type must have key and value fields:
{map_type}")
+
+ def primitive(self, primitive: pa.DataType) -> IcebergType:
+ if pa.types.is_boolean(primitive):
+ return BooleanType()
+ elif pa.types.is_int32(primitive) or pa.types.is_uint32(primitive):
+ return IntegerType()
+ elif pa.types.is_int64(primitive) or pa.types.is_uint64(primitive):
+ return LongType()
+ elif pa.types.is_float32(primitive):
+ return FloatType()
+ elif pa.types.is_float64(primitive):
+ return DoubleType()
+ elif pa.types.is_decimal(primitive):
+ if isinstance(primitive, pa.Decimal256Type):
+ primitive = cast(pa.Decimal256Type, primitive)
+ else:
+ primitive = cast(pa.Decimal128Type, primitive)
+ return DecimalType(primitive.precision, primitive.scale)
+ elif pa.types.is_string(primitive):
+ return StringType()
+ elif pa.types.is_date(primitive):
+ return DateType()
+ elif pa.types.is_time(primitive):
Review Comment:
If there is an Iceberg schema in the file, I think we can assume that it is
written according to the spec:

With the current check, it is correct:
```python
elif pa.types.is_time(primitive):
if isinstance(primitive, pa.Time64Type) and primitive.unit ==
"us":
return TimeType()
```
We could even simplify it:
```python
elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
return TimeType()
```
The tests are in place:
```python
def test_pyarrow_time32_to_iceberg() -> None:
pyarrow_type = pa.time32("ms")
with pytest.raises(TypeError, match=re.escape("Unsupported type:
time32[ms]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
pyarrow_type = pa.time32("s")
with pytest.raises(TypeError, match=re.escape("Unsupported type:
time32[s]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
def test_pyarrow_time64_us_to_iceberg() -> None:
pyarrow_type = pa.time64("us")
converted_iceberg_type = visit_pyarrow(pyarrow_type, _ConvertToIceberg())
assert converted_iceberg_type == TimeType()
assert visit(converted_iceberg_type, _ConvertToArrowSchema()) ==
pyarrow_type
def test_pyarrow_time64_ns_to_iceberg() -> None:
pyarrow_type = pa.time64("ns")
with pytest.raises(TypeError, match=re.escape("Unsupported type:
time64[ns]")):
visit_pyarrow(pyarrow_type, _ConvertToIceberg())
```
I think we can resolve this issue
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]