Fokko commented on code in PR #8174:
URL: https://github.com/apache/iceberg/pull/8174#discussion_r1288116597


##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,210 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class _SchemaUpdate(UpdateSchema):
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):
+        self._table = table
+        self._schema = schema
+        if last_column_id:
+            self._last_column_id = last_column_id
+        else:
+            self._last_column_id = schema.highest_field_id
+
+        self._identifier_field_names = schema.column_names
+        self._adds: Dict[int, List[NestedField]] = {}
+        self._added_name_to_id: Dict[str, int] = {}
+        self._id_to_parent: Dict[int, str] = {}
+        self._allow_incompatible_changes: bool = False
+        self._case_sensitive: bool = True
+
+    def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
+        self._case_sensitive = case_sensitive
+        return self
+
+    def add_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None
+    ) -> UpdateSchema:
+        if "." in name:
+            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+
+        self._internal_add_column(parent, name, True, type_var, doc)
+        return self
+
+    def add_required_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None
+    ) -> UpdateSchema:
+        if "." in name:
+            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+
+        if not self._allow_incompatible_changes:
+            raise ValueError(f"Incompatible change: cannot add required 
column: {name}")

Review Comment:
   Can you add a comment here that this will be possible with spec V3, which 
sets an initial value?



##########
python/mkdocs/docs/api.md:
##########
@@ -146,6 +146,29 @@ catalog.create_table(
 )
 ```
 
+### Update table schema
+
+Add new columns through the `Transaction` or `UpdateSchema` API:
+
+Use the Transaction API:
+
+```python
+with table.transaction() as transaction:
+    transaction.update_schema().add_column("x", IntegerType(), "doc").commit()

Review Comment:
   The context manager will do the commit for you:
   ```suggestion
       transaction.update_schema().add_column("x", IntegerType(), "doc")
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,210 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class _SchemaUpdate(UpdateSchema):
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):
+        self._table = table
+        self._schema = schema
+        if last_column_id:
+            self._last_column_id = last_column_id
+        else:
+            self._last_column_id = schema.highest_field_id
+
+        self._identifier_field_names = schema.column_names
+        self._adds: Dict[int, List[NestedField]] = {}
+        self._added_name_to_id: Dict[str, int] = {}
+        self._id_to_parent: Dict[int, str] = {}
+        self._allow_incompatible_changes: bool = False
+        self._case_sensitive: bool = True
+
+    def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
+        self._case_sensitive = case_sensitive
+        return self
+
+    def add_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None
+    ) -> UpdateSchema:

Review Comment:
   What do you think of keeping this in a single function?
   ```suggestion
       def add_column(
           self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None, required: bool = False
       ) -> UpdateSchema:
   ```
   The majority of the column being added will be optional.



##########
python/pyiceberg/schema.py:
##########
@@ -1082,45 +1100,23 @@ def build_position_accessors(schema_or_type: 
Union[Schema, IcebergType]) -> Dict
     return visit(schema_or_type, _BuildPositionAccessors())
 
 
-class _FindLastFieldId(SchemaVisitor[int]):
-    """Traverses the schema to get the highest field-id."""
-
-    def schema(self, schema: Schema, struct_result: int) -> int:
-        return struct_result
-
-    def struct(self, struct: StructType, field_results: List[int]) -> int:
-        return max(field_results)
-
-    def field(self, field: NestedField, field_result: int) -> int:
-        return max(field.field_id, field_result)
-
-    def list(self, list_type: ListType, element_result: int) -> int:
-        return element_result
-
-    def map(self, map_type: MapType, key_result: int, value_result: int) -> 
int:
-        return max(key_result, value_result)
-
-    def primitive(self, primitive: PrimitiveType) -> int:
-        return 0
-
-
-def assign_fresh_schema_ids(schema: Schema) -> Schema:
+def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], 
next_id: Optional[Callable[[], int]] = None) -> Schema:
     """Traverses the schema, and sets new IDs."""
-    return pre_order_visit(schema, _SetFreshIDs())
+    return pre_order_visit(schema_or_type, _SetFreshIDs(next_id_func=next_id))
 
 
 class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
     """Traverses the schema and assigns monotonically increasing ids."""
 
-    counter: itertools.count  # type: ignore
     reserved_ids: Dict[int, int]
 
-    def __init__(self, start: int = 1) -> None:
+    def __init__(self, start: int = 1, next_id_func: Optional[Callable[[], 
int]] = None) -> None:
         self.counter = itertools.count(start)

Review Comment:
   This one can go now



##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,210 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class _SchemaUpdate(UpdateSchema):
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):

Review Comment:
   Looking at the code, the `table` is always passed in, so maybe better to 
make that one required
   ```suggestion
       def __init__(self, schema: Schema, table: Table, last_column_id: 
Optional[int] = None):
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,210 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class _SchemaUpdate(UpdateSchema):
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):
+        self._table = table
+        self._schema = schema
+        if last_column_id:
+            self._last_column_id = last_column_id
+        else:
+            self._last_column_id = schema.highest_field_id
+
+        self._identifier_field_names = schema.column_names
+        self._adds: Dict[int, List[NestedField]] = {}
+        self._added_name_to_id: Dict[str, int] = {}
+        self._id_to_parent: Dict[int, str] = {}
+        self._allow_incompatible_changes: bool = False
+        self._case_sensitive: bool = True
+
+    def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
+        self._case_sensitive = case_sensitive
+        return self
+
+    def add_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None
+    ) -> UpdateSchema:
+        if "." in name:
+            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+
+        self._internal_add_column(parent, name, True, type_var, doc)
+        return self
+
+    def add_required_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None
+    ) -> UpdateSchema:
+        if "." in name:
+            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+
+        if not self._allow_incompatible_changes:
+            raise ValueError(f"Incompatible change: cannot add required 
column: {name}")
+
+        self._internal_add_column(parent, name, False, type_var, doc)
+        return self
+
+    def allow_incompatible_changes(self) -> UpdateSchema:
+        self._allow_incompatible_changes = True
+        return self
+
+    def apply(self) -> Schema:

Review Comment:
   Should we make this one private?
   ```suggestion
       def _apply(self) -> Schema:
   ```



##########
python/pyiceberg/table/__init__.py:
##########
@@ -841,3 +868,210 @@ def to_ray(self) -> ray.data.dataset.Dataset:
         import ray
 
         return ray.data.from_arrow(self.to_arrow())
+
+
+class _SchemaUpdate(UpdateSchema):
+    def __init__(self, schema: Schema, table: Optional[Table] = None, 
last_column_id: Optional[int] = None):
+        self._table = table
+        self._schema = schema
+        if last_column_id:
+            self._last_column_id = last_column_id
+        else:
+            self._last_column_id = schema.highest_field_id
+
+        self._identifier_field_names = schema.column_names
+        self._adds: Dict[int, List[NestedField]] = {}
+        self._added_name_to_id: Dict[str, int] = {}
+        self._id_to_parent: Dict[int, str] = {}
+        self._allow_incompatible_changes: bool = False
+        self._case_sensitive: bool = True
+
+    def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
+        self._case_sensitive = case_sensitive
+        return self
+
+    def add_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None
+    ) -> UpdateSchema:
+        if "." in name:
+            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+
+        self._internal_add_column(parent, name, True, type_var, doc)
+        return self
+
+    def add_required_column(
+        self, name: str, type_var: IcebergType, doc: Optional[str] = None, 
parent: Optional[str] = None
+    ) -> UpdateSchema:
+        if "." in name:
+            raise ValueError(f"Cannot add column with ambiguous name: {name}")
+
+        if not self._allow_incompatible_changes:
+            raise ValueError(f"Incompatible change: cannot add required 
column: {name}")
+
+        self._internal_add_column(parent, name, False, type_var, doc)
+        return self
+
+    def allow_incompatible_changes(self) -> UpdateSchema:
+        self._allow_incompatible_changes = True
+        return self
+
+    def apply(self) -> Schema:
+        return _apply_changes(self._schema, self._adds, 
self._identifier_field_names)
+
+    def commit(self) -> None:
+        if self._table is None:
+            raise ValueError("Cannot commit schema update, table is not set")
+
+        # Strip the catalog name
+        self._table.catalog._commit_table(  # pylint: disable=W0212
+            CommitTableRequest(
+                identifier=self._table.identifier[1:],
+                updates=[AddSchemaUpdate(schema=self.apply())],
+            )
+        )
+
+    def _internal_add_column(
+        self, parent: Optional[str], name: str, is_optional: bool, type_var: 
IcebergType, doc: Optional[str]
+    ) -> None:
+        full_name: str = name
+        parent_id: int = TABLE_ROOT_ID
+
+        exist_field: Optional[NestedField] = None
+        if parent:
+            parent_field = self._schema.find_field(parent, 
self._case_sensitive)
+            parent_type = parent_field.field_type
+            if isinstance(parent_type, MapType):
+                parent_field = parent_type.value_field
+            elif isinstance(parent_type, ListType):
+                parent_field = parent_type.element_field
+
+            if not parent_field.field_type.is_struct:
+                raise ValueError(f"Cannot add column to non-struct type: 
{parent}")
+
+            parent_id = parent_field.field_id
+
+            try:
+                exist_field = self._schema.find_field(parent + "." + name, 
self._case_sensitive)
+            except ValueError:
+                pass
+
+            if exist_field:
+                raise ValueError(f"Cannot add column, name already exists: 
{parent}.{name}")
+
+            full_name = parent_field.name + "." + name
+
+        else:
+            try:
+                exist_field = self._schema.find_field(name, 
self._case_sensitive)
+            except ValueError:
+                pass
+
+            if exist_field:
+                raise ValueError(f"Cannot add column, name already exists: 
{name}")
+
+        # assign new IDs in order
+        new_id = self.assign_new_column_id()
+
+        # update tracking for moves
+        self._added_name_to_id[full_name] = new_id
+
+        new_type = assign_fresh_schema_ids(type_var, self.assign_new_column_id)
+        field = NestedField(new_id, name, new_type, not is_optional, doc)
+
+        self._adds.setdefault(parent_id, []).append(field)
+
+    def assign_new_column_id(self) -> int:

Review Comment:
   What do you think of using a counter here as well?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to