Fokko commented on code in PR #8174:
URL: https://github.com/apache/iceberg/pull/8174#discussion_r1279439033


##########
python/pyiceberg/schema.py:
##########
@@ -766,7 +769,7 @@ def _(obj: MapType, visitor: SchemaVisitor[T]) -> T:
 
     visitor.before_map_value(obj.value_field)
     value_result = visit(obj.value_type, visitor)
-    visitor.after_list_element(obj.value_field)
+    visitor.after_map_value(obj.value_field)

Review Comment:
   Nice catch



##########
python/pyiceberg/schema.py:
##########
@@ -1367,3 +1382,258 @@ def _(file_type: DecimalType, read_type: IcebergType) 
-> IcebergType:
             raise ResolveError(f"Cannot reduce precision from {file_type} to 
{read_type}")
     else:
         raise ResolveError(f"Cannot promote an decimal to {read_type}")
+
+
+class UpdateSchema(ABC):

Review Comment:
   Do we need this ABC? Feels a bit Java'ish, also the naming is confusing 
`UpdateSchema` and `SchemaUpdate` as the implementation



##########
python/pyiceberg/schema.py:
##########
@@ -201,7 +204,7 @@ def find_type(self, name_or_id: Union[str, int], 
case_sensitive: bool = True) ->
 
     @property
     def highest_field_id(self) -> int:
-        return visit(self.as_struct(), _FindLastFieldId())
+        return max(self._lazy_id_to_name.keys(), default=0)

Review Comment:
   Nice



##########
python/pyiceberg/table/__init__.py:
##########
@@ -152,6 +154,27 @@ def set_properties(self, **updates: str) -> Transaction:
         """
         return self._append_updates(SetPropertiesUpdate(updates=updates))
 
+    def add_column(

Review Comment:
   I don't think we want to have this API on the Transaction itself. I like 
what @rdblue is suggesting 
[here](https://github.com/apache/iceberg/pull/6323#discussion_r1217020366).
   
   This would look like:
   ```python
   # When you want to do multiple things
   with table.transaction() as t:
       t.updateSchema().addColumn("x", IntegerType()).commit()
       t.set_property(updated_at=str(datetime.now())
   
   # or when you don't want to use a transaction:
   t.updateSchema().addColumn("x", IntegerType()).commit()    
   ```
   
   The `.commit()` will accumulate all the changes to the schema, and combine 
this into a single `AddSchemaUpdate` that's being added to the `self._updates`. 
This way we don't need `self._schema_update` which seems to be a little brittle 
since we set it to null again.



##########
python/tests/conftest.py:
##########
@@ -153,13 +153,19 @@ def table_schema_nested() -> Schema:
         NestedField(
             field_id=11,
             name="location",
-            field_type=ListType(
-                element_id=12,
-                element_type=StructType(
+            field_type=MapType(

Review Comment:
   Maybe better to not change this fixture, and create a new schema?



##########
python/pyiceberg/schema.py:
##########
@@ -1082,44 +1101,40 @@ def build_position_accessors(schema_or_type: 
Union[Schema, IcebergType]) -> Dict
     return visit(schema_or_type, _BuildPositionAccessors())
 
 
-class _FindLastFieldId(SchemaVisitor[int]):
-    """Traverses the schema to get the highest field-id."""
-
-    def schema(self, schema: Schema, struct_result: int) -> int:
-        return struct_result
-
-    def struct(self, struct: StructType, field_results: List[int]) -> int:
-        return max(field_results)
-
-    def field(self, field: NestedField, field_result: int) -> int:
-        return max(field.field_id, field_result)
-
-    def list(self, list_type: ListType, element_result: int) -> int:
-        return element_result
+@singledispatch

Review Comment:
   I think this `@singledispatch` is not needed:
   
   ```
   def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], 
next_id: Optional[Callable[[], int]] = None) -> Schema:
       """Traverses the schema, and sets new IDs."""
       return pre_order_visit(schema_or_type, 
_SetFreshIDs(next_id_func=next_id))
   ```



##########
python/pyiceberg/schema.py:
##########
@@ -1082,44 +1101,40 @@ def build_position_accessors(schema_or_type: 
Union[Schema, IcebergType]) -> Dict
     return visit(schema_or_type, _BuildPositionAccessors())
 
 
-class _FindLastFieldId(SchemaVisitor[int]):
-    """Traverses the schema to get the highest field-id."""
-
-    def schema(self, schema: Schema, struct_result: int) -> int:
-        return struct_result
-
-    def struct(self, struct: StructType, field_results: List[int]) -> int:
-        return max(field_results)
-
-    def field(self, field: NestedField, field_result: int) -> int:
-        return max(field.field_id, field_result)
-
-    def list(self, list_type: ListType, element_result: int) -> int:
-        return element_result
+@singledispatch
+def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], 
next_id: Optional[Callable[[], int]] = None) -> Schema:
+    """Traverses the schema, and sets new IDs."""
+    raise ValueError(f"Unsupported type: {schema_or_type}")
 
-    def map(self, map_type: MapType, key_result: int, value_result: int) -> 
int:
-        return max(key_result, value_result)
 
-    def primitive(self, primitive: PrimitiveType) -> int:
-        return 0
+@assign_fresh_schema_ids.register(Schema)
+def _(schema: Schema, next_id: Optional[Callable[[], int]] = None) -> Schema:
+    """Traverses the schema, and sets new IDs."""
+    return pre_order_visit(schema, _SetFreshIDs(next_id_func=next_id))
 
 
-def assign_fresh_schema_ids(schema: Schema) -> Schema:
+@assign_fresh_schema_ids.register(IcebergType)
+def _(type_var: IcebergType, next_id: Optional[Callable[[], int]] = None) -> 
Schema:
     """Traverses the schema, and sets new IDs."""
-    return pre_order_visit(schema, _SetFreshIDs())
+    return pre_order_visit(type_var, _SetFreshIDs(next_id_func=next_id))
 
 
 class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
     """Traverses the schema and assigns monotonically increasing ids."""
 
     counter: itertools.count  # type: ignore
     reserved_ids: Dict[int, int]
+    next_id_func: Optional[Callable[[], int]] = None
 
-    def __init__(self, start: int = 1) -> None:
+    def __init__(self, start: int = 1, next_id_func: Optional[Callable[[], 
int]] = None) -> None:
         self.counter = itertools.count(start)
         self.reserved_ids = {}
+        self.next_id_func = next_id_func
 
     def _get_and_increment(self) -> int:
+        if self.next_id_func:
+            return self.next_id_func()

Review Comment:
   I think we can combine them by wrapping a counter in a function:
   ```python
   ➜  iceberg git:(p_add_column) python3
   Python 3.9.6 (default, May  7 2023, 23:32:44) 
   [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
   Type "help", "copyright", "credits" or "license" for more information.
   >>> import itertools
   >>> c = itertools.count()
   >>> f = lambda: next(c)
   >>> f()
   0
   >>> f()
   1
   >>> f()
   2
   ```



##########
python/pyiceberg/schema.py:
##########
@@ -1082,44 +1101,40 @@ def build_position_accessors(schema_or_type: 
Union[Schema, IcebergType]) -> Dict
     return visit(schema_or_type, _BuildPositionAccessors())
 
 
-class _FindLastFieldId(SchemaVisitor[int]):
-    """Traverses the schema to get the highest field-id."""
-
-    def schema(self, schema: Schema, struct_result: int) -> int:
-        return struct_result
-
-    def struct(self, struct: StructType, field_results: List[int]) -> int:
-        return max(field_results)
-
-    def field(self, field: NestedField, field_result: int) -> int:
-        return max(field.field_id, field_result)
-
-    def list(self, list_type: ListType, element_result: int) -> int:
-        return element_result
+@singledispatch
+def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType], 
next_id: Optional[Callable[[], int]] = None) -> Schema:
+    """Traverses the schema, and sets new IDs."""
+    raise ValueError(f"Unsupported type: {schema_or_type}")
 
-    def map(self, map_type: MapType, key_result: int, value_result: int) -> 
int:
-        return max(key_result, value_result)
 
-    def primitive(self, primitive: PrimitiveType) -> int:
-        return 0
+@assign_fresh_schema_ids.register(Schema)
+def _(schema: Schema, next_id: Optional[Callable[[], int]] = None) -> Schema:
+    """Traverses the schema, and sets new IDs."""
+    return pre_order_visit(schema, _SetFreshIDs(next_id_func=next_id))
 
 
-def assign_fresh_schema_ids(schema: Schema) -> Schema:
+@assign_fresh_schema_ids.register(IcebergType)
+def _(type_var: IcebergType, next_id: Optional[Callable[[], int]] = None) -> 
Schema:
     """Traverses the schema, and sets new IDs."""
-    return pre_order_visit(schema, _SetFreshIDs())
+    return pre_order_visit(type_var, _SetFreshIDs(next_id_func=next_id))
 
 
 class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
     """Traverses the schema and assigns monotonically increasing ids."""
 
     counter: itertools.count  # type: ignore
     reserved_ids: Dict[int, int]
+    next_id_func: Optional[Callable[[], int]] = None
 
-    def __init__(self, start: int = 1) -> None:
+    def __init__(self, start: int = 1, next_id_func: Optional[Callable[[], 
int]] = None) -> None:
         self.counter = itertools.count(start)
         self.reserved_ids = {}
+        self.next_id_func = next_id_func
 
     def _get_and_increment(self) -> int:
+        if self.next_id_func:
+            return self.next_id_func()

Review Comment:
   I think we can combine them by wrapping a counter in a function:
   ```python
   ➜  iceberg git:(p_add_column) python3
   Python 3.9.6 (default, May  7 2023, 23:32:44) 
   [Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
   Type "help", "copyright", "credits" or "license" for more information.
   >>> import itertools
   >>> c = itertools.count()
   >>> f = lambda: next(c)
   >>> f()
   0
   >>> f()
   1
   >>> f()
   2
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to