JonasJ-ap commented on code in PR #6997:
URL: https://github.com/apache/iceberg/pull/6997#discussion_r1174199419


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -476,6 +483,202 @@ def expression_to_pyarrow(expr: BooleanExpression) -> 
pc.Expression:
     return boolean_expression_visit(expr, _ConvertToArrowExpression())
 
 
+def pyarrow_to_schema(schema: pa.Schema) -> Schema:
+    visitor = _ConvertToIceberg()
+    struct_results = []
+    for i in range(len(schema.names)):
+        field = schema.field(i)
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+    return visitor.schema(schema, struct_results)
+
+
+@singledispatch
+def visit_pyarrow(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    """A generic function for applying a pyarrow schema visitor to any point 
within a schema
+
+    The function traverses the schema in post-order fashion
+
+    Args:
+        obj(Schema | IcebergType): An instance of a Schema or an IcebergType
+        visitor (PyarrowSchemaVisitor[T]): An instance of an implementation of 
the generic PyarrowSchemaVisitor base class
+
+    Raises:
+        NotImplementedError: If attempting to visit an unrecognized object type
+    """
+    raise NotImplementedError("Cannot visit non-type: %s" % obj)
+
+
+@visit_pyarrow.register(pa.StructType)
+def _(obj: pa.StructType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    struct_results = []
+    for field in obj:
+        visitor.before_field(field)
+        struct_result = visit_pyarrow(field.type, visitor)
+        visitor.after_field(field)
+        struct_results.append(struct_result)
+
+    return visitor.struct(obj, struct_results)
+
+
+@visit_pyarrow.register(pa.ListType)
+def _(obj: pa.ListType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_list_element(obj.value_field)
+    list_result = visit_pyarrow(obj.value_field.type, visitor)
+    visitor.after_list_element(obj.value_field)
+    return visitor.list(obj, list_result)
+
+
+@visit_pyarrow.register(pa.MapType)
+def _(obj: pa.MapType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    visitor.before_map_key(obj.key_field)
+    key_result = visit_pyarrow(obj.key_field.type, visitor)
+    visitor.after_map_key(obj.key_field)
+    visitor.before_map_value(obj.item_field)
+    value_result = visit_pyarrow(obj.item_field.type, visitor)
+    visitor.after_map_value(obj.item_field)
+    return visitor.map(obj, key_result, value_result)
+
+
+@visit_pyarrow.register(pa.DataType)
+def _(obj: pa.DataType, visitor: PyarrowSchemaVisitor[T]) -> T:
+    if pa.types.is_nested(obj):
+        raise TypeError(f"Expected primitive type, got {type(obj)}")
+    return visitor.primitive(obj)
+
+
+class PyarrowSchemaVisitor(Generic[T], ABC):
+    def before_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a field."""
+
+    def after_field(self, field: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a field."""
+
+    def before_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a list element."""
+
+    def after_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a list element."""
+
+    def before_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map key."""
+
+    def after_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map key."""
+
+    def before_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a map value."""
+
+    def after_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a map value."""
+
+    @abstractmethod
+    def schema(self, schema: pa.Schema, field_results: List[T]) -> Schema:
+        """visit a schema"""
+
+    @abstractmethod
+    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
+        """visit a struct"""
+
+    @abstractmethod
+    def list(self, list_type: pa.ListType, element_result: T) -> T:
+        """visit a list"""
+
+    @abstractmethod
+    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
+        """visit a map"""
+
+    @abstractmethod
+    def primitive(self, primitive: pa.DataType) -> T:
+        """visit a primitive type"""
+
+
+def _get_field_id(field: pa.Field) -> int:
+    if field.metadata is not None:
+        field_metadata = {k.decode(): v.decode() for k, v in 
field.metadata.items()}
+        if field_id := field_metadata.get("PARQUET:field_id"):
+            return int(field_id)
+    raise ValueError(f"Field {field.name} does not have a field_id")
+
+
+class _ConvertToIceberg(PyarrowSchemaVisitor[IcebergType], ABC):
+    def schema(self, schema: pa.Schema, field_results: List[IcebergType]) -> 
Schema:
+        fields = []
+        for i in range(len(schema.names)):
+            field = schema.field(i)
+            field_id = _get_field_id(field)
+            field_type = field_results[i]
+            if field_type is not None:
+                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable))
+        return Schema(*fields)
+
+    def struct(self, struct: pa.StructType, field_results: List[IcebergType]) 
-> IcebergType:
+        fields = []
+        for i in range(struct.num_fields):
+            field = struct[i]
+            field_id = _get_field_id(field)
+            # may need to check doc strings
+            field_type = field_results[i]
+            if field_type is not None:
+                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable))
+        return StructType(*fields)
+
+    def list(self, list_type: pa.ListType, element_result: IcebergType) -> 
IcebergType:
+        element_field = list_type.value_field
+        element_id = _get_field_id(element_field)
+        if element_result is not None:
+            return ListType(element_id, element_result, element_required=not 
element_field.nullable)
+        raise ValueError(f"List type must have element field: {list_type}")
+
+    def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: 
IcebergType) -> IcebergType:
+        key_field = map_type.key_field
+        key_id = _get_field_id(key_field)
+        value_field = map_type.item_field
+        value_id = _get_field_id(value_field)
+        if key_result is not None and value_result is not None:
+            return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
+        raise ValueError(f"Map type must have key and value fields: 
{map_type}")
+
+    def primitive(self, primitive: pa.DataType) -> IcebergType:
+        if pa.types.is_boolean(primitive):
+            return BooleanType()
+        elif pa.types.is_int32(primitive) or pa.types.is_uint32(primitive):
+            return IntegerType()
+        elif pa.types.is_int64(primitive) or pa.types.is_uint64(primitive):
+            return LongType()
+        elif pa.types.is_float32(primitive):
+            return FloatType()
+        elif pa.types.is_float64(primitive):
+            return DoubleType()
+        elif pa.types.is_decimal(primitive):
+            if isinstance(primitive, pa.Decimal256Type):
+                primitive = cast(pa.Decimal256Type, primitive)
+            else:
+                primitive = cast(pa.Decimal128Type, primitive)
+            return DecimalType(primitive.precision, primitive.scale)
+        elif pa.types.is_string(primitive):
+            return StringType()
+        elif pa.types.is_date(primitive):
+            return DateType()
+        elif pa.types.is_time(primitive):

Review Comment:
   Thank you for your review and for bringing up your concerns. I'd like to 
understand better what you find suspicious about PyArrow's ability to read 
Python files with different time or timestamp representations.
   
   From what I understand, Iceberg's TimeType, TimestampType, and 
TimestampzType require `us` and `UTC`, and my current conversion ensures that 
no data is lost.
   
   In this case, PyArrow can support reading non-UTC timezones and `s`, `ms`, 
and `us` precision, but it does not support nanosecond precision since the 
final requested type during projection will be `us` and `UTC`.: 
https://github.com/apache/iceberg/blob/283107d3608e65804314ba8fd2cb76aafdc36bbd/python/pyiceberg/io/pyarrow.py#L397-L404
   
   I chose to restrict the precision to `us` and the timezone to `UTC` because 
the Iceberg specification requires all stored time/timestamp to be in this 
precision and timezone. Since the `pyarrow_to_schema` visitor is used to read 
an Iceberg table's data file, I believe we should only support `us` and `UTC`.
   
   However, I am also not very sure about it. Regarding support for other 
precision and timezone here, I think more discussion and modifications may be 
needed if we want to add other support. How about creating another PR if needed 
to address these concerns?
   
   Thank you again for your feedback, and please let me know if you have any 
further questions or concerns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to