rdblue commented on code in PR #8374:
URL: https://github.com/apache/iceberg/pull/8374#discussion_r1313620802
##########
python/pyiceberg/table/__init__.py:
##########
@@ -934,206 +976,630 @@ def case_sensitive(self, case_sensitive: bool) ->
UpdateSchema:
return self
def add_column(
- self, name: str, type_var: IcebergType, doc: Optional[str] = None,
parent: Optional[str] = None, required: bool = False
+ self, path: Union[str, Tuple[str, ...]], field_type: IcebergType, doc:
Optional[str] = None, required: bool = False
) -> UpdateSchema:
"""Add a new column to a nested struct or Add a new top-level column.
+ Because "." may be interpreted as a column path separator or may be
used in field names, it
+ is not allowed to add nested column by passing in a string. To add to
nested structures or
+ to add fields with names that contain "." use a tuple instead to
indicate the path.
+
+ If type is a nested type, its field IDs are reassigned when added to
the existing schema.
+
Args:
- name: Name for the new column.
- type_var: Type for the new column.
+ path: Name for the new column.
+ field_type: Type for the new column.
doc: Documentation string for the new column.
- parent: Name of the parent struct to the column will be added to.
required: Whether the new column is required.
Returns:
- This for method chaining
+ This for method chaining.
"""
- if "." in name:
- raise ValueError(f"Cannot add column with ambiguous name: {name}")
+ if isinstance(path, str):
+ if "." in path:
+ raise ValueError(f"Cannot add column with ambiguous name:
{path}, provide a tuple instead")
+ path = (path,)
if required and not self._allow_incompatible_changes:
# Table format version 1 and 2 cannot add required column because
there is no initial value
- raise ValueError(f"Incompatible change: cannot add required
column: {name}")
+ raise ValueError(f'Incompatible change: cannot add required
column: {".".join(path)}')
+
+ name = path[-1]
+ parent = path[:-1]
+
+ full_name = ".".join(path)
+ parent_full_path = ".".join(parent)
+ parent_id: int = TABLE_ROOT_ID
+
+ if len(parent) > 0:
+ parent_field = self._schema.find_field(parent_full_path,
self._case_sensitive)
+ parent_type = parent_field.field_type
+ if isinstance(parent_type, MapType):
+ parent_field = parent_type.value_field
+ elif isinstance(parent_type, ListType):
+ parent_field = parent_type.element_field
+
+ if not parent_field.field_type.is_struct:
+ raise ValueError(f"Cannot add column '{name}' to non-struct
type: {'.'.join(parent)}")
+
+ parent_id = parent_field.field_id
+
+ existing_field = None
+ try:
+ existing_field = self._schema.find_field(full_name,
self._case_sensitive)
+ except ValueError:
+ pass
+
+ if existing_field is not None and existing_field.field_id not in
self._deletes:
+ raise ValueError(f"Cannot add column, name already exists:
{full_name}")
+
+ # assign new IDs in order
+ new_id = self.assign_new_column_id()
+
+ # update tracking for moves
+ self._added_name_to_id[full_name] = new_id
+ self._id_to_parent[new_id] = parent_full_path
+
+ new_type = assign_fresh_schema_ids(field_type,
self.assign_new_column_id)
+ field = NestedField(field_id=new_id, name=name, field_type=new_type,
required=required, doc=doc)
+
+ if parent_id in self._adds:
+ self._adds[parent_id].append(field)
+ else:
+ self._adds[parent_id] = [field]
- self._internal_add_column(parent, name, not required, type_var, doc)
return self
- def allow_incompatible_changes(self) -> UpdateSchema:
- """Allow incompatible changes to the schema.
+ def delete_column(self, path: Union[str, Tuple[str, ...]]) -> UpdateSchema:
+ """Delete a column from a table.
+
+ Args:
+ path: The path to the column.
Returns:
- This for method chaining
+ The UpdateSchema with the delete operation staged.
"""
- self._allow_incompatible_changes = True
+ name = (path,) if isinstance(path, str) else path
+ full_name = ".".join(name)
+
+ field = self._schema.find_field(full_name,
case_sensitive=self._case_sensitive)
+
+ if field.field_id in self._adds:
+ raise ValueError(f"Cannot delete a column that has additions:
{full_name}")
+ if field.field_id in self._updates:
+ raise ValueError(f"Cannot delete a column that has updates:
{full_name}")
+
+ self._deletes.add(field.field_id)
+
return self
- def commit(self) -> None:
- """Apply the pending changes and commit."""
- new_schema = self._apply()
- updates = [
- AddSchemaUpdate(schema=new_schema,
last_column_id=new_schema.highest_field_id),
- SetCurrentSchemaUpdate(schema_id=-1),
- ]
- requirements =
[AssertCurrentSchemaId(current_schema_id=self._schema.schema_id)]
+ def rename_column(self, path_from: Union[str, Tuple[str, ...]], new_name:
str) -> UpdateSchema:
+ """Update the name of a column.
+
+ Args:
+ path_from: The path to the column to be renamed.
+ new_name: The new path of the column.
- if self._transaction is not None:
- self._transaction._append_updates(*updates) # pylint:
disable=W0212
- self._transaction._append_requirements(*requirements) # pylint:
disable=W0212
+ Returns:
+ The UpdateSchema with the rename operation staged.
+ """
+ path_from = ".".join(path_from) if isinstance(path_from, tuple) else
path_from
+ field_from = self._schema.find_field(path_from, self._case_sensitive)
+
+ if field_from.field_id in self._deletes:
+ raise ValueError(f"Cannot rename a column that will be deleted:
{path_from}")
+
+ if updated := self._updates.get(field_from.field_id):
+ self._updates[field_from.field_id] = NestedField(
+ field_id=updated.field_id,
+ name=new_name,
+ field_type=updated.field_type,
+ doc=updated.doc,
+ required=updated.required,
+ )
else:
- table_update_response = self._table.catalog._commit_table( #
pylint: disable=W0212
- CommitTableRequest(identifier=self._table.identifier[1:],
updates=updates, requirements=requirements)
+ self._updates[field_from.field_id] = NestedField(
+ field_id=field_from.field_id,
+ name=new_name,
+ field_type=field_from.field_type,
+ doc=field_from.doc,
+ required=field_from.required,
)
- self._table.metadata = table_update_response.metadata
- self._table.metadata_location =
table_update_response.metadata_location
- def _apply(self) -> Schema:
- """Apply the pending changes to the original schema and returns the
result.
+ if path_from in self._identifier_field_names:
+ self._identifier_field_names.remove(path_from)
+
self._identifier_field_names.add(f"{path_from[:-len(field_from.name)]}{new_name}")
+
+ return self
+
+ def require_column(self, path: Union[str, Tuple[str, ...]]) ->
UpdateSchema:
Review Comment:
Isn't this `update_column('a', required=True)`? Is this intended to be a
nicer way to set that?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]