Fokko commented on code in PR #219:
URL: https://github.com/apache/iceberg-python/pull/219#discussion_r1450994558


##########
pyiceberg/io/pyarrow.py:
##########
@@ -698,77 +708,143 @@ def before_field(self, field: pa.Field) -> None:
     def after_field(self, field: pa.Field) -> None:
         """Override this method to perform an action immediately after 
visiting a field."""
 
+    def before_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting an element within a ListType."""
+
+    def after_list_element(self, element: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting an element within a ListType."""
+
+    def before_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a key within a MapType."""
+
+    def after_map_key(self, key: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a key within a MapType."""
+
+    def before_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately before 
visiting a value within a MapType."""
+
+    def after_map_value(self, value: pa.Field) -> None:
+        """Override this method to perform an action immediately after 
visiting a value within a MapType."""
+
     @abstractmethod
-    def schema(self, schema: pa.Schema, field_results: List[Optional[T]]) -> 
Optional[T]:
+    def schema(self, schema: pa.Schema, struct_result: T) -> T:
         """Visit a schema."""
 
     @abstractmethod
-    def struct(self, struct: pa.StructType, field_results: List[Optional[T]]) 
-> Optional[T]:
+    def struct(self, struct: pa.StructType, field_results: List[T]) -> T:
         """Visit a struct."""
 
     @abstractmethod
-    def list(self, list_type: pa.ListType, element_result: Optional[T]) -> 
Optional[T]:
+    def field(self, field: pa.Field, field_result: T) -> T:
+        """Visit a field."""
+
+    @abstractmethod
+    def list(self, list_type: pa.ListType, element_result: T) -> T:
         """Visit a list."""
 
     @abstractmethod
-    def map(self, map_type: pa.MapType, key_result: Optional[T], value_result: 
Optional[T]) -> Optional[T]:
+    def map(self, map_type: pa.MapType, key_result: T, value_result: T) -> T:
         """Visit a map."""
 
     @abstractmethod
-    def primitive(self, primitive: pa.DataType) -> Optional[T]:
+    def primitive(self, primitive: pa.DataType) -> T:
         """Visit a primitive type."""
 
 
 def _get_field_id(field: pa.Field) -> Optional[int]:
-    for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
-        if field_id_str := field.metadata.get(pyarrow_field_id_key):
-            return int(field_id_str.decode())
+    if field.metadata:
+        for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+            if field_id_str := field.metadata.get(pyarrow_field_id_key):
+                return int(field_id_str.decode())
     return None
 
 
 def _get_field_doc(field: pa.Field) -> Optional[str]:
-    for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
-        if doc_str := field.metadata.get(pyarrow_doc_key):
-            return doc_str.decode()
+    if field.metadata:
+        for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+            if doc_str := field.metadata.get(pyarrow_doc_key):
+                return doc_str.decode()
     return None
 
 
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
-    def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: 
List[Optional[IcebergType]]) -> List[NestedField]:
-        fields = []
-        for i, field in enumerate(arrow_fields):
-            field_id = _get_field_id(field)
-            field_doc = _get_field_doc(field)
-            field_type = field_results[i]
-            if field_type is not None and field_id is not None:
-                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc))
-        return fields
-
-    def schema(self, schema: pa.Schema, field_results: 
List[Optional[IcebergType]]) -> Schema:
-        return Schema(*self._convert_fields(schema, field_results))
-
-    def struct(self, struct: pa.StructType, field_results: 
List[Optional[IcebergType]]) -> IcebergType:
-        return StructType(*self._convert_fields(struct, field_results))
-
-    def list(self, list_type: pa.ListType, element_result: 
Optional[IcebergType]) -> Optional[IcebergType]:
+class _HasIds(PyArrowSchemaVisitor[bool]):
+    def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
+        return struct_result
+
+    def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
+        return all(field_results)
+
+    def field(self, field: pa.Field, field_result: bool) -> bool:
+        return all([_get_field_id(field) is not None, field_result])
+
+    def list(self, list_type: pa.ListType, element_result: bool) -> bool:
         element_field = list_type.value_field
         element_id = _get_field_id(element_field)
-        if element_result is not None and element_id is not None:
-            return ListType(element_id, element_result, element_required=not 
element_field.nullable)
-        return None
+        return element_result and element_id is not None
 
-    def map(
-        self, map_type: pa.MapType, key_result: Optional[IcebergType], 
value_result: Optional[IcebergType]
-    ) -> Optional[IcebergType]:
+    def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) 
-> bool:
         key_field = map_type.key_field
         key_id = _get_field_id(key_field)
         value_field = map_type.item_field
         value_id = _get_field_id(value_field)
-        if key_result is not None and value_result is not None and key_id is 
not None and value_id is not None:
-            return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
-        return None
+        return all([key_id is not None, value_id is not None, key_result, 
value_result])
+
+    def primitive(self, primitive: pa.DataType) -> bool:
+        return True
 
-    def primitive(self, primitive: pa.DataType) -> IcebergType:
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+    """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from 
name_mapping if provided."""
+
+    field_names: List[str]
+    name_mapping: Optional[NameMapping]
+
+    def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
+        self.field_names = []
+        self.name_mapping = name_mapping
+
+    def _current_path(self) -> str:
+        return ".".join(self.field_names)
+
+    def _get_field_id(self, field: pa.Field) -> int:

Review Comment:
   Thanks for raising this.
   
   For the Write support what we do is: Convert the Arrow schema to Iceberg. 
Create a new table from it where it will [get fresh IDs 
assigned](https://github.com/apache/iceberg-python/blob/4593208f67e1101a94059a93bfb13f609ac2ad80/pyiceberg/schema.py#L1216-L1218),
 and then you can use that schema to write the dataframe to the table.
   
   > Would it make sense to pass an optional argument "assign_ids" to 
_ConvertToIceberg class where the user can ask the class to assign field IDs in 
order, instead of using the attached metadata field_id or the name_mapping?
   
   I don't think that's a good idea, and this is mostly from the learnings that 
we had from Java. For example:
   
   ```
   1: firstname string
   2: lastname string
   ```
   
   And you add a field called `title`:
   
   ```
   1: title string
   2: firstname string
   3: lastname string
   ```
   
   This way we re-assign the fields, and we could only add fields at the end. 
But this gets complex with nested fields, because they need to know about 
pre-order traversal.
   
   Can you elaborate more on the use-case? A workaround is to create a 
name-mapping, and then pass the name mapping to the visitor. After this PR, we 
probably want to add the 
[UnionByNameVisitor](https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/schema/UnionByNameVisitor.java)
 as well. This is important when we want to do the schema evolution.
   
   Related, in https://github.com/apache/iceberg-python/pull/252 I'm working on 
adding the field-IDs to the Arrow schema. This should it also make it easier to 
perform the union, in case you want to write to an existing table, but with a 
new column. This PR is blocked on the new Arrow release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to