HonahX commented on code in PR #245: URL: https://github.com/apache/iceberg-python/pull/245#discussion_r1469034571
########## pyiceberg/table/__init__.py: ########## @@ -533,6 +551,39 @@ def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: _Ta return base_metadata.model_copy(update={"current_schema_id": new_schema_id}) +@_apply_table_update.register(AddPartitionSpecUpdate) +def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + for spec in base_metadata.partition_specs: + if spec.spec_id == update.spec_id: + raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}") + + context.add_update(update) + return base_metadata.model_copy( + update={ + "partition_specs": base_metadata.partition_specs + [update.spec], + } + ) + + +@_apply_table_update.register(SetDefaultSpecUpdate) +def _(update: SetDefaultSpecUpdate, base_metadata: TableMetadata, context: _TableMetadataUpdateContext) -> TableMetadata: + new_spec_id = update.spec_id + if new_spec_id == base_metadata.default_spec_id: Review Comment: Shall we add the support for `-1` spec_id which represents the last spec added in this transaction? https://github.com/apache/iceberg/blob/6852278d6cf01bec2998be954d7bd6f6cc37cc94/open-api/rest-catalog-open-api.yaml#L2242 ########## pyiceberg/table/__init__.py: ########## @@ -2271,3 +2325,240 @@ def commit(self) -> Snapshot: ) return snapshot + + +class UpdateSpec: + _table: Table + _name_to_field: Dict[str, PartitionField] = {} + _name_to_added_field: Dict[str, PartitionField] = {} + _transform_to_field: Dict[Tuple[int, str], PartitionField] = {} + _transform_to_added_field: Dict[Tuple[int, str], PartitionField] = {} + _renames: Dict[str, str] = {} + _added_time_fields: Dict[int, PartitionField] = {} + _case_sensitive: bool + _adds: List[PartitionField] + _deletes: Set[int] + _last_assigned_partition_id: int + _transaction: Optional[Transaction] + _unassigned_field_name = 'unassigned_field_name' Review Comment: Thanks for the analysis! I agree that we should keep the validation. I was thinking a little bit of refactoring may make things look better. I've left a comment at the place where we provide the name for the new field. Please let me know what you think. ########## pyiceberg/table/__init__.py: ########## @@ -2271,3 +2325,243 @@ def commit(self) -> Snapshot: ) return snapshot + + +class UpdateSpec: + _table: Table + _name_to_field: Dict[str, PartitionField] = {} + _name_to_added_field: Dict[str, PartitionField] = {} + _transform_to_field: Dict[Tuple[int, str], PartitionField] = {} + _transform_to_added_field: Dict[Tuple[int, str], PartitionField] = {} + _renames: Dict[str, str] = {} + _added_time_fields: Dict[int, PartitionField] = {} + _case_sensitive: bool + _adds: List[PartitionField] + _deletes: Set[int] + _last_assigned_partition_id: int + _transaction: Optional[Transaction] + _unassigned_field_name = 'unassigned_field_name' + + def __init__(self, table: Table, transaction: Optional[Transaction] = None, case_sensitive: bool = True) -> None: + self._table = table + self._name_to_field = {field.name: field for field in table.spec().fields} + self._name_to_added_field = {} + self._transform_to_field = {(field.source_id, repr(field.transform)): field for field in table.spec().fields} + self._transform_to_added_field = {} + self._adds = [] + self._deletes = set() + if len(table.specs()) == 1: + self._last_assigned_partition_id = PARTITION_FIELD_ID_START - 1 + else: + self._last_assigned_partition_id = table.spec().last_assigned_field_id + self._renames = {} + self._transaction = transaction + self._case_sensitive = case_sensitive + self._added_time_fields = {} + + def add_field( + self, + source_column_name: str, + transform: Transform[Any, Any], + partition_field_name: Optional[str] = _unassigned_field_name, + ) -> UpdateSpec: + ref = Reference(source_column_name) + bound_ref = ref.bind(self._table.schema(), self._case_sensitive) + # verify transform can actually bind it + output_type = bound_ref.field.field_type + if not transform.can_transform(output_type): + raise ValueError(f"{transform} cannot transform {output_type} values from {bound_ref.field.name}") + + transform_key = (bound_ref.field.field_id, repr(transform)) + existing_partition_field = self._transform_to_field.get(transform_key) + if existing_partition_field and self._is_duplicate_partition(transform, existing_partition_field): + raise ValueError(f"Duplicate partition field for ${ref.name}=${ref}, ${existing_partition_field} already exists") + + added = self._transform_to_added_field.get(transform_key) + if added: + raise ValueError(f"Already added partition: {added.name}") + + new_field = self._partition_field((bound_ref.field.field_id, transform), partition_field_name) + if new_field.name == self._unassigned_field_name: Review Comment: How about moving the unassigned case inside `_partition_field`. This way we only need to do a None-check inside the `_partition_field` and make 'unassigned_field_name' a temp name for a temp PartitionField. ```python def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]], name: Optional[str]) -> PartitionField: ... if name is None: tmp_field = PartitionField(transform_key[0], self._new_field_id(), transform_key[1], "unassigned_field_name") name = visit_partition_field(self._table.schema(), tmp_field, _PartitionNameGenerator()) return PartitionField(transform_key[0], self._new_field_id(), transform_key[1], name) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org