Fokko commented on code in PR #8393:
URL: https://github.com/apache/iceberg/pull/8393#discussion_r1305661994
##########
python/pyiceberg/schema.py:
##########
@@ -273,6 +277,49 @@ def field_ids(self) -> Set[int]:
"""Returns the IDs of the current schema."""
return set(self._name_to_id.values())
+ @staticmethod
+ def validate_identifier_field(field_id: int, id_to_field: Dict[int,
NestedField], id_to_parent: Dict[int, int]) -> None:
+ """Validates that the field with the given ID is a valid identifier
field.
+
+ Args:
+ field_id: The ID of the field to validate.
+ id_to_field: A map from field IDs to field objects.
+ id_to_parent: A map from field IDs to their parent field IDs.
+
+ Raises:
+ ValueError: If the field is not valid.
+ """
+ field = id_to_field.get(field_id)
+ if field is None:
+ raise ValueError(f"Cannot add fieldId {field_id} as an identifier
field: field does not exist")
+
+ if not field.field_type.is_primitive:
+ raise ValueError(f"Cannot add field {field.name} as an identifier
field: not a primitive type field")
+
+ if not field.required:
+ raise ValueError(f"Cannot add field {field.name} as an identifier
field: not a required field")
+
+ if isinstance(field.field_type, DoubleType) or
isinstance(field.field_type, FloatType):
Review Comment:
These can be combined:
```suggestion
if isinstance(field.field_type, (DoubleType, FloatType)):
```
##########
python/pyiceberg/schema.py:
##########
@@ -273,6 +277,49 @@ def field_ids(self) -> Set[int]:
"""Returns the IDs of the current schema."""
return set(self._name_to_id.values())
+ @staticmethod
+ def validate_identifier_field(field_id: int, id_to_field: Dict[int,
NestedField], id_to_parent: Dict[int, int]) -> None:
+ """Validates that the field with the given ID is a valid identifier
field.
+
+ Args:
+ field_id: The ID of the field to validate.
+ id_to_field: A map from field IDs to field objects.
+ id_to_parent: A map from field IDs to their parent field IDs.
+
+ Raises:
+ ValueError: If the field is not valid.
+ """
+ field = id_to_field.get(field_id)
+ if field is None:
+ raise ValueError(f"Cannot add fieldId {field_id} as an identifier
field: field does not exist")
+
+ if not field.field_type.is_primitive:
+ raise ValueError(f"Cannot add field {field.name} as an identifier
field: not a primitive type field")
+
+ if not field.required:
+ raise ValueError(f"Cannot add field {field.name} as an identifier
field: not a required field")
+
+ if isinstance(field.field_type, DoubleType) or
isinstance(field.field_type, FloatType):
+ raise ValueError(f"Cannot add field {field.name} as an identifier
field: must not be float or double field")
+
+ # Check whether the nested field is in a chain of required struct
fields
+ # Exploring from root for better error message for list and map types
+ parent_id = id_to_parent.get(field.field_id)
+ fields: List[int] = []
+ while parent_id is not None:
Review Comment:
More of a style question. Do we first want to store it into `fields` we
could also already do the check.
##########
python/pyiceberg/schema.py:
##########
@@ -882,6 +929,57 @@ def index_by_id(schema_or_type: Union[Schema,
IcebergType]) -> Dict[int, NestedF
return visit(schema_or_type, _IndexById())
+class _IndexParents(SchemaVisitor[Dict[int, int]]):
+ def __init__(self) -> None:
+ self.id_to_parent: Dict[int, int] = {}
+ self.id_stack: List[int] = []
+
+ def before_field(self, field: NestedField) -> None:
+ self.id_stack.append(field.field_id)
+
+ def after_field(self, field: NestedField) -> None:
+ self.id_stack.pop()
+
+ def schema(self, schema: Schema, struct_result: Dict[int, int]) ->
Dict[int, int]:
+ return self.id_to_parent
+
+ def struct(self, struct: StructType, field_results: List[Dict[int, int]])
-> Dict[int, int]:
+ for field in struct.fields:
+ parent_id = self.id_stack[-1] if self.id_stack else None
+ if parent_id is not None:
+ # fields in the root struct are not added
+ self.id_to_parent[field.field_id] = parent_id
+
+ return self.id_to_parent
+
+ def field(self, field: NestedField, field_result: Dict[int, int]) ->
Dict[int, int]:
+ return self.id_to_parent
+
+ def list(self, list_type: ListType, element_result: Dict[int, int]) ->
Dict[int, int]:
+ self.id_to_parent[list_type.element_id] = self.id_stack[-1]
+ return self.id_to_parent
+
+ def map(self, map_type: MapType, key_result: Dict[int, int], value_result:
Dict[int, int]) -> Dict[int, int]:
+ self.id_to_parent[map_type.key_id] = self.id_stack[-1]
+ self.id_to_parent[map_type.value_id] = self.id_stack[-1]
+ return self.id_to_parent
+
+ def primitive(self, primitive: PrimitiveType) -> Dict[int, int]:
+ return self.id_to_parent
+
+
+def index_parents(schema_or_type: Union[Schema, IcebergType]) -> Dict[int,
int]:
Review Comment:
Should we make this one private to the module? I don't think we want to use
this outside of `schema.py`
```suggestion
def _index_parents(schema_or_type: Union[Schema, IcebergType]) -> Dict[int,
int]:
```
##########
python/pyiceberg/schema.py:
##########
@@ -88,6 +88,10 @@ def __init__(self, *fields: NestedField, **data: Any):
data["fields"] = fields
super().__init__(**data)
self._name_to_id = index_by_name(self)
+ if self.identifier_field_ids is not None:
Review Comment:
Since `Schema` derives from `IcebergBaseModel`, we can use Pydantic
validators:
https://github.com/apache/iceberg/blob/master/python/pyiceberg/table/metadata.py#L239-L241
With an after validator, we already check if the field is a `List[int]`, and
when it is none, the `default_factory=list` is applied.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]