amogh-jahagirdar commented on code in PR #245: URL: https://github.com/apache/iceberg-python/pull/245#discussion_r1476173833
########## pyiceberg/table/__init__.py: ########## @@ -2271,3 +2331,242 @@ def commit(self) -> Snapshot: ) return snapshot + + +class UpdateSpec: + _table: Table + _name_to_field: Dict[str, PartitionField] = {} + _name_to_added_field: Dict[str, PartitionField] = {} + _transform_to_field: Dict[Tuple[int, str], PartitionField] = {} + _transform_to_added_field: Dict[Tuple[int, str], PartitionField] = {} + _renames: Dict[str, str] = {} + _added_time_fields: Dict[int, PartitionField] = {} + _case_sensitive: bool + _adds: List[PartitionField] + _deletes: Set[int] + _last_assigned_partition_id: int + _transaction: Optional[Transaction] + + def __init__(self, table: Table, transaction: Optional[Transaction] = None, case_sensitive: bool = True) -> None: + self._table = table + self._name_to_field = {field.name: field for field in table.spec().fields} + self._name_to_added_field = {} + self._transform_to_field = {(field.source_id, repr(field.transform)): field for field in table.spec().fields} + self._transform_to_added_field = {} + self._adds = [] + self._deletes = set() + if last_partition_id := table.last_partition_id(): + self._last_assigned_partition_id = last_partition_id + else: + self._last_assigned_partition_id = PARTITION_FIELD_ID_START - 1 + self._renames = {} + self._transaction = transaction + self._case_sensitive = case_sensitive + self._added_time_fields = {} + + def add_field( + self, + source_column_name: str, + transform: Transform[Any, Any], + partition_field_name: Optional[str] = None, + ) -> UpdateSpec: + ref = Reference(source_column_name) + bound_ref = ref.bind(self._table.schema(), self._case_sensitive) + # verify transform can actually bind it + output_type = bound_ref.field.field_type + if not transform.can_transform(output_type): + raise ValueError(f"{transform} cannot transform {output_type} values from {bound_ref.field.name}") + + transform_key = (bound_ref.field.field_id, repr(transform)) + existing_partition_field = self._transform_to_field.get(transform_key) + if existing_partition_field and self._is_duplicate_partition(transform, existing_partition_field): + raise ValueError(f"Duplicate partition field for ${ref.name}=${ref}, ${existing_partition_field} already exists") + + added = self._transform_to_added_field.get(transform_key) + if added: + raise ValueError(f"Already added partition: {added.name}") + + new_field = self._partition_field((bound_ref.field.field_id, transform), partition_field_name) + if new_field.name in self._name_to_added_field: + raise ValueError(f"Already added partition field with name: {new_field.name}") + + self._redundant_time_partition(new_field) + self._transform_to_added_field[transform_key] = new_field + + existing_partition_field = self._name_to_field.get(new_field.name) + if existing_partition_field and new_field.field_id not in self._deletes: + if isinstance(existing_partition_field.transform, VoidTransform): + self.rename_field( + existing_partition_field.name, existing_partition_field.name + "_" + str(existing_partition_field.field_id) + ) + else: + raise ValueError(f"Cannot add duplicate partition field name: {existing_partition_field.name}") + + self._name_to_added_field[new_field.name] = new_field + self._adds.append(new_field) + return self + + def add_identity(self, source_column_name: str) -> UpdateSpec: + return self.add_field(source_column_name, IdentityTransform(), None) + + def remove_field(self, name: str) -> UpdateSpec: + added = self._name_to_added_field.get(name) + if added: + raise ValueError(f"Cannot delete newly added field {name}") + renamed = self._renames.get(name) + if renamed: + raise ValueError(f"Cannot rename and delete field {name}") + field = self._name_to_field.get(name) + if not field: + raise ValueError(f"No such partition field: {name}") + + self._deletes.add(field.field_id) + return self + + def rename_field(self, name: str, new_name: str) -> UpdateSpec: + existing_field = self._name_to_field.get(new_name) + if existing_field and isinstance(existing_field.transform, VoidTransform): + return self.rename_field(name, name + "_" + str(existing_field.field_id)) + added = self._name_to_added_field.get(name) + if added: + raise ValueError("Cannot rename recently added partitions") + field = self._name_to_field.get(name) + if not field: + raise ValueError(f"Cannot find partition field {name}") + if field.field_id in self._deletes: + raise ValueError(f"Cannot delete and rename partition field {name}") + self._renames[name] = new_name + return self + + def commit(self) -> None: + new_spec = self._apply() + if self._table.metadata.default_spec_id != new_spec.spec_id: + if new_spec.spec_id not in self._table.specs(): + updates = [AddPartitionSpecUpdate(spec=new_spec), SetDefaultSpecUpdate(spec_id=-1)] + else: + updates = [SetDefaultSpecUpdate(spec_id=new_spec.spec_id)] + + if last_partition_id := self._table.last_partition_id(): + required_last_assigned_partitioned_id = last_partition_id + else: + required_last_assigned_partitioned_id = PARTITION_FIELD_ID_START - 1 + + requirements = [AssertLastAssignedPartitionId(last_assigned_partition_id=required_last_assigned_partitioned_id)] + + if self._transaction is not None: + self._transaction._append_updates(*updates) # pylint: disable=W0212 + self._transaction._append_requirements(*requirements) # pylint: disable=W0212 + else: + requirements.append(AssertDefaultSpecId(default_spec_id=self._table.spec().spec_id)) + self._table._do_commit(updates=tuple(updates), requirements=tuple(requirements)) # pylint: disable=W0212 + + def __exit__(self, _: Any, value: Any, traceback: Any) -> None: + """Close and commit the change.""" + return self.commit() + + def __enter__(self) -> UpdateSpec: + """Update the table.""" + return self + + def _apply(self) -> PartitionSpec: + def _check_and_add_partition_name(schema: Schema, name: str, source_id: int, partition_names: Set[str]) -> None: + try: + field = schema.find_field(name) + except ValueError: + field = None + + if source_id is not None and field is not None and field.field_id != source_id: + raise ValueError(f"Cannot create identity partition from a different field in the schema {name}") + elif field is not None and source_id != field.field_id: + raise ValueError(f"Cannot create partition from name that exists in schema {name}") + if not name: + raise ValueError("Undefined name") + if name in partition_names: + raise ValueError(f"Partition name has to be unique: {name}") + partition_names.add(name) + + def _add_new_field( + schema: Schema, source_id: int, field_id: int, name: str, transform: Transform[Any, Any], partition_names: Set[str] + ) -> PartitionField: + _check_and_add_partition_name(schema, name, source_id, partition_names) + return PartitionField(source_id, field_id, transform, name) + + partition_fields = [] + partition_names: Set[str] = set() + for field in self._table.spec().fields: + if field.field_id not in self._deletes: + renamed = self._renames.get(field.name) + if renamed: + new_field = _add_new_field( + self._table.schema(), field.source_id, field.field_id, renamed, field.transform, partition_names + ) + else: + new_field = _add_new_field( + self._table.schema(), field.source_id, field.field_id, field.name, field.transform, partition_names + ) + partition_fields.append(new_field) + elif self._table.format_version == 1: + renamed = self._renames.get(field.name) + if renamed: + new_field = _add_new_field( + self._table.schema(), field.source_id, field.field_id, renamed, VoidTransform(), partition_names + ) + else: + new_field = _add_new_field( + self._table.schema(), field.source_id, field.field_id, field.name, VoidTransform(), partition_names + ) + + partition_fields.append(new_field) + + for added_field in self._adds: + new_field = PartitionField( + source_id=added_field.source_id, + field_id=added_field.field_id, + transform=added_field.transform, + name=added_field.name, + ) + partition_fields.append(new_field) + + # Reuse spec id or create a new one. + new_spec = PartitionSpec(*partition_fields) + new_spec_id = INITIAL_PARTITION_SPEC_ID + for spec in self._table.specs().values(): + if new_spec.compatible_with(spec): + new_spec_id = spec.spec_id + break + elif new_spec_id <= spec.spec_id: + new_spec_id = spec.spec_id + 1 + return PartitionSpec(*partition_fields, spec_id=new_spec_id) + + def _redundant_time_partition(self, field: PartitionField) -> None: + if isinstance(field.transform, TimeTransform): + existing_time_field = self._added_time_fields.get(field.source_id) + if existing_time_field: + raise ValueError(f"Cannot add time partition field: {field.name} conflicts with {existing_time_field.name}") + self._added_time_fields[field.source_id] = field + + def _partition_field(self, transform_key: Tuple[int, Transform[Any, Any]], name: Optional[str]) -> PartitionField: + if self._table.metadata.format_version == 2: + source_id, transform = transform_key + historical_fields = [] + for spec in self._table.specs().values(): + for field in spec.fields: + historical_fields.append((field.source_id, repr(field.transform), field.name)) + + for field_key in historical_fields: + if field_key[0] == source_id and field_key[1] == repr(transform): + if name is not None or field_key[2] == name: + return field + Review Comment: There's a bug somewhere here in the reuse logic for v2, let me fix that. ``` def test_multiple_remove_and_add_reuses_v2(table_v2: Table) -> None: with table_v2.update_spec() as update: update.add_field("id", BucketTransform(16), "bucketed_id") update.add_field("event_ts", DayTransform(), "day_ts") with table_v2.update_spec() as update: update.remove_field("day_ts").remove_field("bucketed_id") with table_v2.update_spec() as update: update.add_field("id", BucketTransform(16), "bucketed_id") ``` One would expect the bucketed_id to be the only one remaining but for some reason the DayTransform is the one that gets reused and added back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org