HonahX commented on code in PR #219:
URL: https://github.com/apache/iceberg-python/pull/219#discussion_r1456922888


##########
pyiceberg/io/pyarrow.py:
##########
@@ -733,42 +854,178 @@ def _get_field_id(field: pa.Field) -> Optional[int]:
     )
 
 
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
-    def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: 
List[Optional[IcebergType]]) -> List[NestedField]:
-        fields = []
-        for i, field in enumerate(arrow_fields):
-            field_id = _get_field_id(field)
-            field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
-            field_type = field_results[i]
-            if field_type is not None and field_id is not None:
-                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc))
-        return fields
-
-    def schema(self, schema: pa.Schema, field_results: 
List[Optional[IcebergType]]) -> Schema:
-        return Schema(*self._convert_fields(schema, field_results))
-
-    def struct(self, struct: pa.StructType, field_results: 
List[Optional[IcebergType]]) -> IcebergType:
-        return StructType(*self._convert_fields(struct, field_results))
-
-    def list(self, list_type: pa.ListType, element_result: 
Optional[IcebergType]) -> Optional[IcebergType]:
+class _HasIds(PyArrowSchemaVisitor[bool]):
+    def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
+        return struct_result
+
+    def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
+        return all(field_results)
+
+    def field(self, field: pa.Field, field_result: bool) -> bool:
+        return all([_get_field_id(field) is not None, field_result])
+
+    def list(self, list_type: pa.ListType, element_result: bool) -> bool:
         element_field = list_type.value_field
         element_id = _get_field_id(element_field)
-        if element_result is not None and element_id is not None:
-            return ListType(element_id, element_result, element_required=not 
element_field.nullable)
-        return None
+        return element_result and element_id is not None
 
-    def map(
-        self, map_type: pa.MapType, key_result: Optional[IcebergType], 
value_result: Optional[IcebergType]
-    ) -> Optional[IcebergType]:
+    def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) 
-> bool:
         key_field = map_type.key_field
         key_id = _get_field_id(key_field)
         value_field = map_type.item_field
         value_id = _get_field_id(value_field)
-        if key_result is not None and value_result is not None and key_id is 
not None and value_id is not None:
-            return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
-        return None
+        return all([key_id is not None, value_id is not None, key_result, 
value_result])
+
+    def primitive(self, primitive: pa.DataType) -> bool:
+        return True
+
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+    """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from 
name_mapping if provided."""
+
+    _field_names: List[str]
+    _name_mapping: Optional[NameMapping]
+
+    def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
+        self._field_names = []
+        self._name_mapping = name_mapping
+
+    def _current_path(self) -> str:
+        return ".".join(self._field_names)
+
+    def _field_id(self, field: pa.Field) -> int:
+        if self._name_mapping:
+            return self._name_mapping.find(self._current_path()).field_id
+        elif (field_id := _get_field_id(field)) is not None:
+            return field_id
+        else:
+            raise ValueError(f"Cannot convert {field} to Iceberg Field as 
field_id is empty.")
+
+    def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
+        return Schema(*struct_result.fields)
+
+    def struct(self, struct: pa.StructType, field_results: List[NestedField]) 
-> StructType:
+        return StructType(*field_results)
+
+    def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
+        field_id = self._field_id(field)
+        field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
+        field_type = field_result
+        return NestedField(field_id, field.name, field_type, required=not 
field.nullable, doc=field_doc)
+
+    def list(self, list_type: pa.ListType, element_result: IcebergType) -> 
ListType:
+        element_field = list_type.value_field
+        self._field_names.append(LIST_ELEMENT_NAME)
+        element_id = self._field_id(element_field)
+        self._field_names.pop()
+        return ListType(element_id, element_result, element_required=not 
element_field.nullable)
 
-    def primitive(self, primitive: pa.DataType) -> IcebergType:
+    def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: 
IcebergType) -> MapType:
+        key_field = map_type.key_field
+        self._field_names.append(MAP_KEY_NAME)
+        key_id = self._field_id(key_field)
+        self._field_names.pop()
+        value_field = map_type.item_field
+        self._field_names.append(MAP_VALUE_NAME)
+        value_id = self._field_id(value_field)
+        self._field_names.pop()
+        return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
+
+    def primitive(self, primitive: pa.DataType) -> PrimitiveType:
+        if pa.types.is_boolean(primitive):
+            return BooleanType()
+        elif pa.types.is_int32(primitive):
+            return IntegerType()
+        elif pa.types.is_int64(primitive):
+            return LongType()
+        elif pa.types.is_float32(primitive):
+            return FloatType()
+        elif pa.types.is_float64(primitive):
+            return DoubleType()
+        elif isinstance(primitive, pa.Decimal128Type):
+            primitive = cast(pa.Decimal128Type, primitive)
+            return DecimalType(primitive.precision, primitive.scale)
+        elif pa.types.is_string(primitive):
+            return StringType()
+        elif pa.types.is_date32(primitive):
+            return DateType()
+        elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
+            return TimeType()
+        elif pa.types.is_timestamp(primitive):
+            primitive = cast(pa.TimestampType, primitive)
+            if primitive.unit == "us":
+                if primitive.tz == "UTC" or primitive.tz == "+00:00":
+                    return TimestamptzType()
+                elif primitive.tz is None:
+                    return TimestampType()
+        elif pa.types.is_binary(primitive):
+            return BinaryType()
+        elif pa.types.is_fixed_size_binary(primitive):
+            primitive = cast(pa.FixedSizeBinaryType, primitive)
+            return FixedType(primitive.byte_width)
+
+        raise TypeError(f"Unsupported type: {primitive}")
+
+    def before_field(self, field: pa.Field) -> None:
+        self._field_names.append(field.name)
+
+    def after_field(self, field: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_list_element(self, element: pa.Field) -> None:
+        self._field_names.append(LIST_ELEMENT_NAME)
+
+    def after_list_element(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_map_key(self, key: pa.Field) -> None:
+        self._field_names.append(MAP_KEY_NAME)
+
+    def after_map_key(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_map_value(self, value: pa.Field) -> None:
+        self._field_names.append(MAP_VALUE_NAME)
+
+    def after_map_value(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+
+class 
_ConvertToIcebergWithFreshIds(PreOrderPyArrowSchemaVisitor[Union[IcebergType, 
Schema]]):

Review Comment:
   Sorry for the late reply. @syun64 Overall I think it is an effective way to 
reduce code duplication and achieve the same utility. I want to add my findings 
while trying this approach:
   
   1. Assigning everything as -1 and then use `assign_fresh_schema_ids` seems 
not working out of box. It is because in `_SetFreshIds`, we use 
https://github.com/apache/iceberg-python/blob/06e2b2df48e9ff383a09550e3e307d290fe39ddc/pyiceberg/schema.py#L1239-L1242
   a dictionary to keep track of assigned Ids. If our current schema has -1 for 
everything, the resulting new schema will also has the same id for all the 
fields in the same level. To resolve this, we need to either update 
`_SetFreshIds` visitor or assign distinct ids(-1, -2, ...) when converting 
pyarrow schema.
   
   2. I am thinking that if we can still have separate visitors for normal read 
and ignore_ids case, by extracting common logic to a parent class. Based on my 
observation, `schema`, `struct`, `primitive`, and `field` are the same for both 
cases. If we refactor it to:
   ```python
   class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]], 
ABC):
     ...
   
   class _ConvertToIcebergWithFieldIds(_ConvertToIceberg):
     ...
   
   class _ConvertToIcebergWithoutIds(_ConvertToIceberg):
   ```
   we only need to implement `list` and `map` for both visitors, accompanied 
with their distinct ways to get field ids. 
   
   In this way, we can still separate the two use cases while maintaining low 
code duplications. However, this does undermine code readability because we 
split the visitor logic into two places. 
   
   I am raising this primarily because a separate and properly named visitor 
might emphasize that this is a special case for special usage, so it will not 
confuse others who inspect these codes for reference. @syun64 @Fokko What do 
you think of this approach, compared with `_ignore_ids` boolean flag suggested 
by @syun64 (which has the least code duplication and is simpler to read). 
   
   (I have a draft implementation for 2 in my own repo: 
https://github.com/HonahX/iceberg-python/blob/de14f5cb8d91a26541356dd3c614bee8c4b8cb8c/pyiceberg/io/pyarrow.py#L805)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to