moriyoshi commented on code in PR #8144:
URL: https://github.com/apache/iceberg/pull/8144#discussion_r1308670546


##########
python/pyiceberg/io/pyarrow.py:
##########
@@ -641,41 +649,125 @@ def primitive(self, primitive: pa.DataType) -> 
Optional[T]:
 
 
 def _get_field_id(field: pa.Field) -> Optional[int]:
-    for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
-        if field_id_str := field.metadata.get(pyarrow_field_id_key):
-            return int(field_id_str.decode())
+    if field.metadata is not None:
+        for pyarrow_field_id_key in PYARROW_FIELD_ID_KEYS:
+            if field_id_str := field.metadata.get(pyarrow_field_id_key):
+                return int(field_id_str.decode())
     return None
 
 
 def _get_field_doc(field: pa.Field) -> Optional[str]:
-    for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
-        if doc_str := field.metadata.get(pyarrow_doc_key):
-            return doc_str.decode()
+    if field.metadata is not None:
+        for pyarrow_doc_key in PYARROW_FIELD_DOC_KEYS:
+            if doc_str := field.metadata.get(pyarrow_doc_key):
+                return doc_str.decode()
     return None
 
 
-class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType, Schema]]):
+class _ConvertToIceberg(PyArrowSchemaVisitor[Union[IcebergType]]):
+    projected_schema: Union[Schema, ListType, MapType, None]
+    match_with_field_name: bool
+    ignore_unprojectable_fields: bool
+    projected_schema_stack: List[Tuple[Schema | ListType | MapType | None, 
Optional[_Literal["key", "value"]]]]
+    next: Optional[_Literal["key", "value"]]
+
     def _convert_fields(self, arrow_fields: Iterable[pa.Field], field_results: 
List[Optional[IcebergType]]) -> List[NestedField]:
         fields = []
         for i, field in enumerate(arrow_fields):
             field_id = _get_field_id(field)
             field_doc = _get_field_doc(field)
             field_type = field_results[i]
-            if field_type is not None and field_id is not None:
-                fields.append(NestedField(field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc))
+            ib_field: Optional[NestedField] = None
+            if field_type is not None:
+                if field_id is not None:
+                    ib_field = NestedField(field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc)
+                elif self.match_with_field_name:
+                    if not isinstance(self.projected_schema, Schema):
+                        raise ValueError("projected_schema must be provided if 
match_with_field_name is set to True")
+                    try:
+                        projected_field = 
self.projected_schema.find_field(field.name)
+                    except ValueError as e:
+                        if self.ignore_unprojectable_fields:
+                            continue
+                        raise ValueError(
+                            f"could not find a field that corresponds to 
{field.name} in projected schema {self.projected_schema}"
+                        ) from e
+                    ib_field = NestedField(
+                        projected_field.field_id, field.name, field_type, 
required=not field.nullable, doc=field_doc
+                    )
+            if ib_field is not None:
+                fields.append(ib_field)
         return fields
 
-    def schema(self, schema: pa.Schema, field_results: 
List[Optional[IcebergType]]) -> Schema:
-        return Schema(*self._convert_fields(schema, field_results))
+    def schema(self, schema: pa.Schema, field_results: 
List[Optional[IcebergType]]) -> StructType:
+        return StructType(*self._convert_fields(schema, field_results))
+
+    def before_field(self, field: pa.Field) -> None:
+        if not isinstance(field.type, (pa.StructType, pa.ListType, 
pa.MapType)):
+            return
+
+        projected_field: Optional[NestedField] = None
+
+        if isinstance(self.projected_schema, Schema):
+            field_id = _get_field_id(field)
+            if field_id is not None:
+                try:
+                    projected_field = 
self.projected_schema.find_field(field_id)
+                except ValueError:
+                    if not self.match_with_field_name:
+                        raise
+            if projected_field is None and self.match_with_field_name:
+                projected_field = self.projected_schema.find_field(field.name)
+        elif isinstance(self.projected_schema, ListType):
+            projected_field = self.projected_schema.element_field
+        elif isinstance(self.projected_schema, MapType):
+            if self.next == "key":
+                projected_field = self.projected_schema.key_field
+            elif self.next == "value":
+                projected_field = self.projected_schema.value_field
+            else:
+                raise AssertionError("should never get here")
+
+        self.projected_schema_stack.append((self.projected_schema, self.next))
+        inner_schema: Schema | ListType | MapType | None = None
+        if projected_field is not None:
+            field_type = projected_field.field_type
+            if isinstance(field_type, StructType):
+                inner_schema = Schema(*field_type.fields)
+            else:
+                if isinstance(field_type, (ListType, MapType)):
+                    inner_schema = field_type
+
+        self.projected_schema = inner_schema
+        self.next = "key" if isinstance(field.type, pa.MapType) else None
+
+    def after_field(self, field: pa.Field) -> None:
+        if self.next == "key":
+            self.next = "value"
+        elif self.next == "value":
+            self.next = None
+        if not isinstance(field.type, (pa.StructType, pa.ListType, 
pa.MapType)):
+            return
+        (self.projected_schema, self.next) = self.projected_schema_stack.pop()

Review Comment:
   As the stack is used only when we hit any composite type during the 
traversal, so `self.next` is valid within such a type.  If the part causes the 
problem described above, then it just doesn't work as intended, or another part 
is the culprit, I guess. I've been trying to reproduce the problem by preparing 
the same schema with which I think you used to check the behavior, but had no 
luck so far.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to