Fokko commented on code in PR #296: URL: https://github.com/apache/iceberg-python/pull/296#discussion_r1467074883
########## pyiceberg/table/__init__.py: ########## @@ -1995,6 +2020,156 @@ def primitive(self, primitive: PrimitiveType) -> Optional[IcebergType]: return primitive +class UnionByNameVisitor(SchemaWithPartnerVisitor[int, bool]): + update_schema: UpdateSchema + new_schema: Schema + case_sensitive: bool + + def __init__(self, update_schema: UpdateSchema, new_schema: Schema, case_sensitive: bool) -> None: + self.update_schema = update_schema + self.new_schema = new_schema + self.case_sensitive = case_sensitive + + def schema(self, schema: Schema, partner_id: Optional[int], struct_result: bool) -> bool: + return struct_result + + def struct(self, struct: StructType, partner_id: Optional[int], missing_positions: List[bool]) -> bool: + if partner_id is None: + return True + + fields = struct.fields + partner_struct = self._find_field_type(partner_id) + + for pos, missing in enumerate(missing_positions): + if missing: + self._add_column(partner_id, fields[pos]) + else: + field = fields[pos] + if nested_field := partner_struct.field_by_name(field.name, case_sensitive=self.case_sensitive): + self._update_column(field, nested_field) + + return False + + def _add_column(self, parent_id: int, field: NestedField) -> None: + if parent_name := self.new_schema.find_column_name(parent_id): + path: Tuple[str, ...] = (parent_name, field.name) + else: + path = (field.name,) + + self.update_schema.add_column(path=path, field_type=field.field_type, required=field.required, doc=field.doc) + + def _update_column(self, field: NestedField, existing_field: NestedField) -> None: + full_name = self.new_schema.find_column_name(existing_field.field_id) + + if full_name is None: + raise ValueError(f"Could not find field: {existing_field}") + + if field.optional and existing_field.required: + self.update_schema.make_column_optional(full_name) + + if field.field_type.is_primitive and field.field_type != existing_field.field_type: + self.update_schema.update_column(full_name, field_type=field.field_type) + + if field.doc is not None and not field.doc != existing_field.doc: + self.update_schema.update_column(full_name, doc=field.doc) + + def _find_field_type(self, field_id: int) -> IcebergType: + if field_id == -1: + return self.new_schema.as_struct() + else: + return self.new_schema.find_field(field_id).field_type + + def field(self, field: NestedField, field_partner: Optional[int], field_result: bool) -> bool: Review Comment: I like it, thanks for the suggestion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org