sungwy commented on code in PR #1014:
URL: https://github.com/apache/iceberg-python/pull/1014#discussion_r1706931818


##########
pyiceberg/table/name_mapping.py:
##########
@@ -74,6 +75,11 @@ class NameMapping(IcebergRootModel[List[MappedField]]):
     def _field_by_name(self) -> Dict[str, MappedField]:
         return visit_name_mapping(self, _IndexByName())
 
+    @deprecated(
+        deprecated_in="0.7.0",
+        removed_in="0.8.0",

Review Comment:
   Should this be 0.8.0? 😄 
   
   ```suggestion
           deprecated_in="0.8.0",
           removed_in="0.9.0",
   ```



##########
pyiceberg/table/name_mapping.py:
##########
@@ -248,3 +254,127 @@ def create_mapping_from_schema(schema: Schema) -> 
NameMapping:
 
 def update_mapping(mapping: NameMapping, updates: Dict[int, NestedField], 
adds: Dict[int, List[NestedField]]) -> NameMapping:
     return NameMapping(visit_name_mapping(mapping, _UpdateMapping(updates, 
adds)))
+
+
+class NameMappingAccessor(PartnerAccessor[MappedField]):
+    def schema_partner(self, partner: Optional[MappedField]) -> 
Optional[MappedField]:
+        return partner
+
+    def field_partner(
+        self, partner_struct: Optional[Union[List[MappedField], MappedField]], 
_: int, field_name: str
+    ) -> Optional[MappedField]:
+        if partner_struct is not None:
+            if isinstance(partner_struct, MappedField):
+                partner_struct = partner_struct.fields
+
+            for field in partner_struct:
+                if field_name in field.names:
+                    return field
+
+        return None
+
+    def list_element_partner(self, partner_list: Optional[MappedField]) -> 
Optional[MappedField]:
+        if partner_list is not None:
+            for field in partner_list.fields:
+                if "element" in field.names:
+                    return field
+        return None
+
+    def map_key_partner(self, partner_map: Optional[MappedField]) -> 
Optional[MappedField]:
+        if partner_map is not None:
+            for field in partner_map.fields:
+                if "key" in field.names:
+                    return field
+        return None
+
+    def map_value_partner(self, partner_map: Optional[MappedField]) -> 
Optional[MappedField]:
+        if partner_map is not None:
+            for field in partner_map.fields:
+                if "value" in field.names:
+                    return field
+        return None
+
+
+class NameMappingProjectionVisitor(SchemaWithPartnerVisitor[MappedField, 
IcebergType]):
+    current_path: List[str]
+
+    def __init__(self) -> None:
+        # For keeping track where we are in case when a field cannot be found
+        self.current_path = []
+
+    def before_field(self, field: NestedField, field_partner: Optional[P]) -> 
None:
+        self.current_path.append(field.name)
+
+    def after_field(self, field: NestedField, field_partner: Optional[P]) -> 
None:
+        self.current_path.pop()
+
+    def before_list_element(self, element: NestedField, element_partner: 
Optional[P]) -> None:
+        self.current_path.append("element")
+
+    def after_list_element(self, element: NestedField, element_partner: 
Optional[P]) -> None:
+        self.current_path.pop()
+
+    def before_map_key(self, key: NestedField, key_partner: Optional[P]) -> 
None:
+        self.current_path.append("key")
+
+    def after_map_key(self, key: NestedField, key_partner: Optional[P]) -> 
None:
+        self.current_path.pop()
+
+    def before_map_value(self, value: NestedField, value_partner: Optional[P]) 
-> None:
+        self.current_path.append("value")
+
+    def after_map_value(self, value: NestedField, value_partner: Optional[P]) 
-> None:
+        self.current_path.pop()
+
+    def schema(self, schema: Schema, schema_partner: Optional[MappedField], 
struct_result: StructType) -> IcebergType:
+        return Schema(*struct_result.fields, schema_id=schema.schema_id)
+
+    def struct(self, struct: StructType, struct_partner: 
Optional[MappedField], field_results: List[NestedField]) -> IcebergType:
+        return StructType(*field_results)
+
+    def field(self, field: NestedField, field_partner: Optional[MappedField], 
field_result: IcebergType) -> IcebergType:
+        if field_partner is None:
+            raise ValueError(f"Field missing from NameMapping: {field}")

Review Comment:
   nit: What do you think of aligning the error message with the rest of the 
errors by using `self.current_path`? 
   
   A NestedField does not keep a record of its parentage, so I think it 
self.current_path will be more representative of where we are in the 
NameMapping tree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to