syun64 commented on code in PR #219:
URL: https://github.com/apache/iceberg-python/pull/219#discussion_r1457593015


##########
pyiceberg/io/pyarrow.py:
##########
@@ -733,42 +854,178 @@ def _get_field_id(field: pa.Field) -> Optional[int]:
     )
 
 
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
-    def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: 
List[Optional[IcebergType]]) -> List[NestedField]:
-        fields = []
-        for i, field in enumerate(arrow_fields):
-            field_id = _get_field_id(field)
-            field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
-            field_type = field_results[i]
-            if field_type is not None and field_id is not None:
-                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc))
-        return fields
-
-    def schema(self, schema: pa.Schema, field_results: 
List[Optional[IcebergType]]) -> Schema:
-        return Schema(*self._convert_fields(schema, field_results))
-
-    def struct(self, struct: pa.StructType, field_results: 
List[Optional[IcebergType]]) -> IcebergType:
-        return StructType(*self._convert_fields(struct, field_results))
-
-    def list(self, list_type: pa.ListType, element_result: 
Optional[IcebergType]) -> Optional[IcebergType]:
+class _HasIds(PyArrowSchemaVisitor[bool]):
+    def schema(self, schema: pa.Schema, struct_result: bool) -> bool:
+        return struct_result
+
+    def struct(self, struct: pa.StructType, field_results: List[bool]) -> bool:
+        return all(field_results)
+
+    def field(self, field: pa.Field, field_result: bool) -> bool:
+        return all([_get_field_id(field) is not None, field_result])
+
+    def list(self, list_type: pa.ListType, element_result: bool) -> bool:
         element_field = list_type.value_field
         element_id = _get_field_id(element_field)
-        if element_result is not None and element_id is not None:
-            return ListType(element_id, element_result, element_required=not 
element_field.nullable)
-        return None
+        return element_result and element_id is not None
 
-    def map(
-        self, map_type: pa.MapType, key_result: Optional[IcebergType], 
value_result: Optional[IcebergType]
-    ) -> Optional[IcebergType]:
+    def map(self, map_type: pa.MapType, key_result: bool, value_result: bool) 
-> bool:
         key_field = map_type.key_field
         key_id = _get_field_id(key_field)
         value_field = map_type.item_field
         value_id = _get_field_id(value_field)
-        if key_result is not None and value_result is not None and key_id is 
not None and value_id is not None:
-            return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
-        return None
+        return all([key_id is not None, value_id is not None, key_result, 
value_result])
+
+    def primitive(self, primitive: pa.DataType) -> bool:
+        return True
+
+
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+    """Converts PyArrowSchema to Iceberg Schema. Applies the IDs from 
name_mapping if provided."""
+
+    _field_names: List[str]
+    _name_mapping: Optional[NameMapping]
+
+    def __init__(self, name_mapping: Optional[NameMapping] = None) -> None:
+        self._field_names = []
+        self._name_mapping = name_mapping
+
+    def _current_path(self) -> str:
+        return ".".join(self._field_names)
+
+    def _field_id(self, field: pa.Field) -> int:
+        if self._name_mapping:
+            return self._name_mapping.find(self._current_path()).field_id
+        elif (field_id := _get_field_id(field)) is not None:
+            return field_id
+        else:
+            raise ValueError(f"Cannot convert {field} to Iceberg Field as 
field_id is empty.")
+
+    def schema(self, schema: pa.Schema, struct_result: StructType) -> Schema:
+        return Schema(*struct_result.fields)
+
+    def struct(self, struct: pa.StructType, field_results: List[NestedField]) 
-> StructType:
+        return StructType(*field_results)
+
+    def field(self, field: pa.Field, field_result: IcebergType) -> NestedField:
+        field_id = self._field_id(field)
+        field_doc = doc_str.decode() if (field.metadata and (doc_str := 
field.metadata.get(PYARROW_FIELD_DOC_KEY))) else None
+        field_type = field_result
+        return NestedField(field_id, field.name, field_type, required=not 
field.nullable, doc=field_doc)
+
+    def list(self, list_type: pa.ListType, element_result: IcebergType) -> 
ListType:
+        element_field = list_type.value_field
+        self._field_names.append(LIST_ELEMENT_NAME)
+        element_id = self._field_id(element_field)
+        self._field_names.pop()
+        return ListType(element_id, element_result, element_required=not 
element_field.nullable)
 
-    def primitive(self, primitive: pa.DataType) -> IcebergType:
+    def map(self, map_type: pa.MapType, key_result: IcebergType, value_result: 
IcebergType) -> MapType:
+        key_field = map_type.key_field
+        self._field_names.append(MAP_KEY_NAME)
+        key_id = self._field_id(key_field)
+        self._field_names.pop()
+        value_field = map_type.item_field
+        self._field_names.append(MAP_VALUE_NAME)
+        value_id = self._field_id(value_field)
+        self._field_names.pop()
+        return MapType(key_id, key_result, value_id, value_result, 
value_required=not value_field.nullable)
+
+    def primitive(self, primitive: pa.DataType) -> PrimitiveType:
+        if pa.types.is_boolean(primitive):
+            return BooleanType()
+        elif pa.types.is_int32(primitive):
+            return IntegerType()
+        elif pa.types.is_int64(primitive):
+            return LongType()
+        elif pa.types.is_float32(primitive):
+            return FloatType()
+        elif pa.types.is_float64(primitive):
+            return DoubleType()
+        elif isinstance(primitive, pa.Decimal128Type):
+            primitive = cast(pa.Decimal128Type, primitive)
+            return DecimalType(primitive.precision, primitive.scale)
+        elif pa.types.is_string(primitive):
+            return StringType()
+        elif pa.types.is_date32(primitive):
+            return DateType()
+        elif isinstance(primitive, pa.Time64Type) and primitive.unit == "us":
+            return TimeType()
+        elif pa.types.is_timestamp(primitive):
+            primitive = cast(pa.TimestampType, primitive)
+            if primitive.unit == "us":
+                if primitive.tz == "UTC" or primitive.tz == "+00:00":
+                    return TimestamptzType()
+                elif primitive.tz is None:
+                    return TimestampType()
+        elif pa.types.is_binary(primitive):
+            return BinaryType()
+        elif pa.types.is_fixed_size_binary(primitive):
+            primitive = cast(pa.FixedSizeBinaryType, primitive)
+            return FixedType(primitive.byte_width)
+
+        raise TypeError(f"Unsupported type: {primitive}")
+
+    def before_field(self, field: pa.Field) -> None:
+        self._field_names.append(field.name)
+
+    def after_field(self, field: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_list_element(self, element: pa.Field) -> None:
+        self._field_names.append(LIST_ELEMENT_NAME)
+
+    def after_list_element(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_map_key(self, key: pa.Field) -> None:
+        self._field_names.append(MAP_KEY_NAME)
+
+    def after_map_key(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+    def before_map_value(self, value: pa.Field) -> None:
+        self._field_names.append(MAP_VALUE_NAME)
+
+    def after_map_value(self, element: pa.Field) -> None:
+        self._field_names.pop()
+
+
+class 
_ConvertToIcebergWithFreshIds(PreOrderPyArrowSchemaVisitor[Union[IcebergType, 
Schema]]):

Review Comment:
   > I think the problem here is that we don't have an API like in Spark where 
we can [nicely hide 
things](https://github.com/apache/iceberg/blob/main/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkCatalog.java#L225-L242).
 I'm almost tempted to allow creating a table from a PyArrow table 
create_table_from_table(df: pa.Table), but that mixes in PyArrow into the main 
API, but refrains us from exposing these things to the user (which isn't super 
user friendly in general). WDYT @syun64 @HonahX ?
   
   I'm in agreement with this idea. I see three ways a user would want to 
create an Iceberg table:
   1. Completely manual - by specifying the schema, field by field
   2. By inferring the schema from an existing strongly-typed file or pyarrow 
table
   3. By copying the schema of an existing iceberg table (migration)
   
   Since we are only concerned with the schema, and not the data: what are your 
thoughts in using the pyarrow schema (instead of pyarrow table) as the input 
for this function?
   
   > Assuming that the Arrow dataframe doesn't have a schema, we'll use name 
mapping to set the names and convert it to an Iceberg schema, and that's all 
safe. So we need to have the ability to apply name-mapping on a PyArrow schema.
   
   Sounds good @Fokko . Since this PR already introduces the ability to apply 
name-mapping onto a PyArrow Schema and create a pyiceberg.Schema, if this is 
the approach we'd like to take, we would need the ability to generate 
name-mapping from a PyArrow Schema with no IDs. This is different from existing 
_CreateMapping which creates name mapping based on an existing pyiceberg Schema 
which already have IDs assigned.
   
   > class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]], 
ABC):
     ...
   class _ConvertToIcebergWithFieldIds(_ConvertToIceberg):
     ...
   class _ConvertToIcebergWithoutIds(_ConvertToIceberg):
   
   One thing I wanted to note, is that the task of assigning fresh IDs to a 
schema needs to be a pre-order visitor, instead of post-order like 
_ConvertToIceberg or _CreateMapping. This ensures that the field_id is assigned 
to the field before they are assigned to the element, key or values. I think 
that would prevent us from having the two visitors inherit from the same parent 
class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to