Fokko commented on code in PR #8174:
URL: https://github.com/apache/iceberg/pull/8174#discussion_r1279439033
##########
python/pyiceberg/schema.py:
##########
@@ -766,7 +769,7 @@ def _(obj: MapType, visitor: SchemaVisitor[T]) -> T:
visitor.before_map_value(obj.value_field)
value_result = visit(obj.value_type, visitor)
- visitor.after_list_element(obj.value_field)
+ visitor.after_map_value(obj.value_field)
Review Comment:
Nice catch
##########
python/pyiceberg/schema.py:
##########
@@ -1367,3 +1382,258 @@ def _(file_type: DecimalType, read_type: IcebergType)
-> IcebergType:
raise ResolveError(f"Cannot reduce precision from {file_type} to
{read_type}")
else:
raise ResolveError(f"Cannot promote an decimal to {read_type}")
+
+
+class UpdateSchema(ABC):
Review Comment:
Do we need this ABC? Feels a bit Java'ish, also the naming is confusing
`UpdateSchema` and `SchemaUpdate` as the implementation
##########
python/pyiceberg/schema.py:
##########
@@ -201,7 +204,7 @@ def find_type(self, name_or_id: Union[str, int],
case_sensitive: bool = True) ->
@property
def highest_field_id(self) -> int:
- return visit(self.as_struct(), _FindLastFieldId())
+ return max(self._lazy_id_to_name.keys(), default=0)
Review Comment:
Nice
##########
python/pyiceberg/table/__init__.py:
##########
@@ -152,6 +154,27 @@ def set_properties(self, **updates: str) -> Transaction:
"""
return self._append_updates(SetPropertiesUpdate(updates=updates))
+ def add_column(
Review Comment:
I don't think we want to have this API on the Transaction itself. I like
what @rdblue is suggesting
[here](https://github.com/apache/iceberg/pull/6323#discussion_r1217020366).
This would look like:
```python
# When you want to do multiple things
with table.transaction() as t:
t.updateSchema().addColumn("x", IntegerType()).commit()
t.set_property(updated_at=str(datetime.now())
# or when you don't want to use a transaction:
t.updateSchema().addColumn("x", IntegerType()).commit()
```
The `.commit()` will accumulate all the changes to the schema, and combine
this into a single `AddSchemaUpdate` that's being added to the `self._updates`.
This way we don't need `self._schema_update` which seems to be a little brittle
since we set it to null again.
##########
python/tests/conftest.py:
##########
@@ -153,13 +153,19 @@ def table_schema_nested() -> Schema:
NestedField(
field_id=11,
name="location",
- field_type=ListType(
- element_id=12,
- element_type=StructType(
+ field_type=MapType(
Review Comment:
Maybe better to not change this fixture, and create a new schema?
##########
python/pyiceberg/schema.py:
##########
@@ -1082,44 +1101,40 @@ def build_position_accessors(schema_or_type:
Union[Schema, IcebergType]) -> Dict
return visit(schema_or_type, _BuildPositionAccessors())
-class _FindLastFieldId(SchemaVisitor[int]):
- """Traverses the schema to get the highest field-id."""
-
- def schema(self, schema: Schema, struct_result: int) -> int:
- return struct_result
-
- def struct(self, struct: StructType, field_results: List[int]) -> int:
- return max(field_results)
-
- def field(self, field: NestedField, field_result: int) -> int:
- return max(field.field_id, field_result)
-
- def list(self, list_type: ListType, element_result: int) -> int:
- return element_result
+@singledispatch
Review Comment:
I think this `@singledispatch` is not needed:
```
def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType],
next_id: Optional[Callable[[], int]] = None) -> Schema:
"""Traverses the schema, and sets new IDs."""
return pre_order_visit(schema_or_type,
_SetFreshIDs(next_id_func=next_id))
```
##########
python/pyiceberg/schema.py:
##########
@@ -1082,44 +1101,40 @@ def build_position_accessors(schema_or_type:
Union[Schema, IcebergType]) -> Dict
return visit(schema_or_type, _BuildPositionAccessors())
-class _FindLastFieldId(SchemaVisitor[int]):
- """Traverses the schema to get the highest field-id."""
-
- def schema(self, schema: Schema, struct_result: int) -> int:
- return struct_result
-
- def struct(self, struct: StructType, field_results: List[int]) -> int:
- return max(field_results)
-
- def field(self, field: NestedField, field_result: int) -> int:
- return max(field.field_id, field_result)
-
- def list(self, list_type: ListType, element_result: int) -> int:
- return element_result
+@singledispatch
+def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType],
next_id: Optional[Callable[[], int]] = None) -> Schema:
+ """Traverses the schema, and sets new IDs."""
+ raise ValueError(f"Unsupported type: {schema_or_type}")
- def map(self, map_type: MapType, key_result: int, value_result: int) ->
int:
- return max(key_result, value_result)
- def primitive(self, primitive: PrimitiveType) -> int:
- return 0
+@assign_fresh_schema_ids.register(Schema)
+def _(schema: Schema, next_id: Optional[Callable[[], int]] = None) -> Schema:
+ """Traverses the schema, and sets new IDs."""
+ return pre_order_visit(schema, _SetFreshIDs(next_id_func=next_id))
-def assign_fresh_schema_ids(schema: Schema) -> Schema:
+@assign_fresh_schema_ids.register(IcebergType)
+def _(type_var: IcebergType, next_id: Optional[Callable[[], int]] = None) ->
Schema:
"""Traverses the schema, and sets new IDs."""
- return pre_order_visit(schema, _SetFreshIDs())
+ return pre_order_visit(type_var, _SetFreshIDs(next_id_func=next_id))
class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
"""Traverses the schema and assigns monotonically increasing ids."""
counter: itertools.count # type: ignore
reserved_ids: Dict[int, int]
+ next_id_func: Optional[Callable[[], int]] = None
- def __init__(self, start: int = 1) -> None:
+ def __init__(self, start: int = 1, next_id_func: Optional[Callable[[],
int]] = None) -> None:
self.counter = itertools.count(start)
self.reserved_ids = {}
+ self.next_id_func = next_id_func
def _get_and_increment(self) -> int:
+ if self.next_id_func:
+ return self.next_id_func()
Review Comment:
I think we can combine them by wrapping a counter in a function:
```python
➜ iceberg git:(p_add_column) python3
Python 3.9.6 (default, May 7 2023, 23:32:44)
[Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import itertools
>>> c = itertools.count()
>>> f = lambda: next(c)
>>> f()
0
>>> f()
1
>>> f()
2
```
##########
python/pyiceberg/schema.py:
##########
@@ -1082,44 +1101,40 @@ def build_position_accessors(schema_or_type:
Union[Schema, IcebergType]) -> Dict
return visit(schema_or_type, _BuildPositionAccessors())
-class _FindLastFieldId(SchemaVisitor[int]):
- """Traverses the schema to get the highest field-id."""
-
- def schema(self, schema: Schema, struct_result: int) -> int:
- return struct_result
-
- def struct(self, struct: StructType, field_results: List[int]) -> int:
- return max(field_results)
-
- def field(self, field: NestedField, field_result: int) -> int:
- return max(field.field_id, field_result)
-
- def list(self, list_type: ListType, element_result: int) -> int:
- return element_result
+@singledispatch
+def assign_fresh_schema_ids(schema_or_type: Union[Schema, IcebergType],
next_id: Optional[Callable[[], int]] = None) -> Schema:
+ """Traverses the schema, and sets new IDs."""
+ raise ValueError(f"Unsupported type: {schema_or_type}")
- def map(self, map_type: MapType, key_result: int, value_result: int) ->
int:
- return max(key_result, value_result)
- def primitive(self, primitive: PrimitiveType) -> int:
- return 0
+@assign_fresh_schema_ids.register(Schema)
+def _(schema: Schema, next_id: Optional[Callable[[], int]] = None) -> Schema:
+ """Traverses the schema, and sets new IDs."""
+ return pre_order_visit(schema, _SetFreshIDs(next_id_func=next_id))
-def assign_fresh_schema_ids(schema: Schema) -> Schema:
+@assign_fresh_schema_ids.register(IcebergType)
+def _(type_var: IcebergType, next_id: Optional[Callable[[], int]] = None) ->
Schema:
"""Traverses the schema, and sets new IDs."""
- return pre_order_visit(schema, _SetFreshIDs())
+ return pre_order_visit(type_var, _SetFreshIDs(next_id_func=next_id))
class _SetFreshIDs(PreOrderSchemaVisitor[IcebergType]):
"""Traverses the schema and assigns monotonically increasing ids."""
counter: itertools.count # type: ignore
reserved_ids: Dict[int, int]
+ next_id_func: Optional[Callable[[], int]] = None
- def __init__(self, start: int = 1) -> None:
+ def __init__(self, start: int = 1, next_id_func: Optional[Callable[[],
int]] = None) -> None:
self.counter = itertools.count(start)
self.reserved_ids = {}
+ self.next_id_func = next_id_func
def _get_and_increment(self) -> int:
+ if self.next_id_func:
+ return self.next_id_func()
Review Comment:
I think we can combine them by wrapping a counter in a function:
```python
➜ iceberg git:(p_add_column) python3
Python 3.9.6 (default, May 7 2023, 23:32:44)
[Clang 14.0.3 (clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import itertools
>>> c = itertools.count()
>>> f = lambda: next(c)
>>> f()
0
>>> f()
1
>>> f()
2
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]