Fokko commented on code in PR #296:
URL: https://github.com/apache/iceberg-python/pull/296#discussion_r1467632051


##########
pyiceberg/table/__init__.py:
##########
@@ -1995,6 +2020,159 @@ def primitive(self, primitive: PrimitiveType) -> 
Optional[IcebergType]:
         return primitive
 
 
+class UnionByNameVisitor(SchemaWithPartnerVisitor[int, bool]):
+    update_schema: UpdateSchema
+    existing_schema: Schema
+    case_sensitive: bool
+
+    def __init__(self, update_schema: UpdateSchema, existing_schema: Schema, 
case_sensitive: bool) -> None:
+        self.update_schema = update_schema
+        self.existing_schema = existing_schema
+        self.case_sensitive = case_sensitive
+
+    def schema(self, schema: Schema, partner_id: Optional[int], struct_result: 
bool) -> bool:
+        return struct_result
+
+    def struct(self, struct: StructType, partner_id: Optional[int], 
missing_positions: List[bool]) -> bool:
+        if partner_id is None:
+            return True
+
+        fields = struct.fields
+        partner_struct = self._find_field_type(partner_id)
+
+        if not partner_struct.is_struct:
+            raise ValueError(f"Expected a struct, got: {partner_struct}")
+
+        for pos, missing in enumerate(missing_positions):
+            if missing:
+                self._add_column(partner_id, fields[pos])
+            else:
+                field = fields[pos]
+                if nested_field := partner_struct.field_by_name(field.name, 
case_sensitive=self.case_sensitive):
+                    self._update_column(field, nested_field)
+
+        return False
+
+    def _add_column(self, parent_id: int, field: NestedField) -> None:
+        if parent_name := self.existing_schema.find_column_name(parent_id):
+            path: Tuple[str, ...] = (parent_name, field.name)
+        else:
+            path = (field.name,)
+
+        self.update_schema.add_column(path=path, field_type=field.field_type, 
required=field.required, doc=field.doc)
+
+    def _update_column(self, field: NestedField, existing_field: NestedField) 
-> None:
+        full_name = 
self.existing_schema.find_column_name(existing_field.field_id)
+
+        if full_name is None:
+            raise ValueError(f"Could not find field: {existing_field}")
+
+        if field.optional and existing_field.required:
+            self.update_schema.make_column_optional(full_name)
+
+        if field.field_type.is_primitive and field.field_type != 
existing_field.field_type:
+            self.update_schema.update_column(full_name, 
field_type=field.field_type)
+
+        if field.doc is not None and not field.doc != existing_field.doc:
+            self.update_schema.update_column(full_name, doc=field.doc)
+
+    def _find_field_type(self, field_id: int) -> IcebergType:
+        if field_id == -1:
+            return self.existing_schema.as_struct()
+        else:
+            return self.existing_schema.find_field(field_id).field_type
+
+    def field(self, field: NestedField, partner_id: Optional[int], 
field_result: bool) -> bool:
+        return partner_id is None
+
+    def list(self, list_type: ListType, list_partner_id: Optional[int], 
element_missing: bool) -> bool:
+        if list_partner_id is None:
+            return True
+
+        if element_missing:
+            raise ValueError("Error traversing schemas: element is missing, 
but list is present")
+
+        partner_list_type = self._find_field_type(list_partner_id)
+        if not isinstance(partner_list_type, ListType):
+            raise ValueError(f"Expected list-type, got: {partner_list_type}")
+
+        self._update_column(list_type.element_field, 
partner_list_type.element_field)
+
+        return False
+
+    def map(self, map_type: MapType, map_partner_id: Optional[int], 
key_missing: bool, value_missing: bool) -> bool:
+        if map_partner_id is None:
+            return True
+
+        if key_missing:
+            raise ValueError("Error traversing schemas: key is missing, but 
map is present")
+
+        if value_missing:
+            raise ValueError("Error traversing schemas: value is missing, but 
map is present")
+
+        partner_map_type = self._find_field_type(map_partner_id)
+        if not isinstance(partner_map_type, MapType):
+            raise ValueError(f"Expected map-type, got: {partner_map_type}")
+
+        self._update_column(map_type.key_field, partner_map_type.key_field)
+        self._update_column(map_type.value_field, partner_map_type.value_field)
+
+        return False
+
+    def primitive(self, primitive: PrimitiveType, primitive_partner: 
Optional[int]) -> bool:

Review Comment:
   Missed that one, thanks :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to