kevinjqliu commented on code in PR #2305:
URL: https://github.com/apache/iceberg-python/pull/2305#discussion_r2280604003
##########
tests/integration/test_partition_evolution.py:
##########
@@ -63,12 +64,19 @@ def _table_v2(catalog: Catalog) -> Table:
return _create_table_with_schema(catalog, schema_with_timestamp, "2")
-def _create_table_with_schema(catalog: Catalog, schema: Schema,
format_version: str) -> Table:
+def _create_table_with_schema(
+ catalog: Catalog, schema: Schema, format_version: str, partition_spec:
Optional[PartitionSpec] = None
+) -> Table:
tbl_name = "default.test_schema_evolution"
try:
catalog.drop_table(tbl_name)
except NoSuchTableError:
pass
+
+ if partition_spec:
+ return catalog.create_table(
+ identifier=tbl_name, schema=schema, partition_spec=partition_spec,
properties={"format-version": format_version}
+ )
return catalog.create_table(identifier=tbl_name, schema=schema,
properties={"format-version": format_version})
Review Comment:
and then we can just do this
```suggestion
return catalog.create_table(
identifier=tbl_name, schema=schema,
partition_spec=partition_spec, properties={"format-version": format_version}
)
```
##########
pyiceberg/table/update/spec.py:
##########
@@ -174,16 +174,12 @@ def _commit(self) -> UpdatesAndRequirements:
return updates, requirements
def _apply(self) -> PartitionSpec:
- def _check_and_add_partition_name(schema: Schema, name: str,
source_id: int, partition_names: Set[str]) -> None:
- try:
- field = schema.find_field(name)
- except ValueError:
- field = None
-
- if source_id is not None and field is not None and field.field_id
!= source_id:
- raise ValueError(f"Cannot create identity partition from a
different field in the schema {name}")
- elif field is not None and source_id != field.field_id:
- raise ValueError(f"Cannot create partition from name that
exists in schema {name}")
+ def _check_and_add_partition_name(
+ schema: Schema, name: str, source_id: int, transform:
Transform[Any, Any], partition_names: Set[str]
+ ) -> None:
+ from pyiceberg.partitioning import validate_partition_name
+
+ validate_partition_name(name, transform, source_id, schema)
if not name:
Review Comment:
wdyt about moving L183-L186 into the `validate_partition_name` to mirror the
java impl
https://github.com/apache/iceberg/blob/4dbc7f578eee7ceb9def35ebfa1a4cc236fb598f/api/src/main/java/org/apache/iceberg/PartitionSpec.java#L412-L414
##########
tests/integration/test_partition_evolution.py:
##########
@@ -63,12 +64,19 @@ def _table_v2(catalog: Catalog) -> Table:
return _create_table_with_schema(catalog, schema_with_timestamp, "2")
-def _create_table_with_schema(catalog: Catalog, schema: Schema,
format_version: str) -> Table:
+def _create_table_with_schema(
+ catalog: Catalog, schema: Schema, format_version: str, partition_spec:
Optional[PartitionSpec] = None
+) -> Table:
Review Comment:
following other create table helpers in tests, for example
https://github.com/apache/iceberg-python/blob/80135451d030569259d83674ef147e0d6f62fd51/tests/integration/test_register_table.py#L40-L59
```suggestion
def _create_table_with_schema(
catalog: Catalog, schema: Schema, format_version: str, partition_spec:
PartitionSpec = UNPARTITIONED_PARTITION_SPEC
) -> Table:
```
##########
pyiceberg/partitioning.py:
##########
@@ -249,6 +249,26 @@ def partition_to_path(self, data: Record, schema: Schema)
-> str:
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+def validate_partition_name(
+ field_name: str,
+ partition_transform: Transform[Any, Any],
+ source_id: int,
+ schema: Schema,
+) -> None:
+ """Validate that a partition field name doesn't conflict with schema field
names."""
+ try:
+ schema_field = schema.find_field(field_name)
+ except ValueError:
+ return # No conflict if field doesn't exist in schema
+
+ if isinstance(partition_transform, (IdentityTransform, VoidTransform)):
+ # For identity transforms, allow conflict only if sourced from the
same schema field
+ if schema_field.field_id != source_id:
+ raise ValueError(f"Cannot create identity partition from a
different source field in the schema: {field_name}")
+ else:
Review Comment:
match java error message
```suggestion
raise ValueError(f"Cannot create identity partition sourced from
different field in schema: {field_name}")
else:
```
##########
pyiceberg/table/update/spec.py:
##########
@@ -244,6 +240,13 @@ def _add_new_field(
partition_fields.append(new_field)
for added_field in self._adds:
+ _check_and_add_partition_name(
+ self._transaction.table_metadata.schema(),
+ added_field.name,
+ added_field.source_id,
+ added_field.transform,
+ partition_names,
+ )
Review Comment:
good catch. just to confirm this covers the newly added partition fields?
##########
pyiceberg/partitioning.py:
##########
@@ -249,6 +249,26 @@ def partition_to_path(self, data: Record, schema: Schema)
-> str:
UNPARTITIONED_PARTITION_SPEC = PartitionSpec(spec_id=0)
+def validate_partition_name(
+ field_name: str,
+ partition_transform: Transform[Any, Any],
+ source_id: int,
+ schema: Schema,
+) -> None:
+ """Validate that a partition field name doesn't conflict with schema field
names."""
+ try:
+ schema_field = schema.find_field(field_name)
+ except ValueError:
+ return # No conflict if field doesn't exist in schema
+
+ if isinstance(partition_transform, (IdentityTransform, VoidTransform)):
+ # For identity transforms, allow conflict only if sourced from the
same schema field
Review Comment:
```suggestion
# For identity and void transforms, allow conflict only if sourced
from the same schema field
```
##########
pyiceberg/table/update/schema.py:
##########
@@ -658,6 +658,14 @@ def _apply(self) -> Schema:
# Check the field-ids
new_schema = Schema(*struct.fields)
+ if self._transaction is not None:
+ from pyiceberg.partitioning import validate_partition_name
+
+ for spec in self._transaction.table_metadata.partition_specs:
+ for partition_field in spec.fields:
+ validate_partition_name(
+ partition_field.name, partition_field.transform,
partition_field.source_id, new_schema
+ )
Review Comment:
i think there should always be a `self._transaction`
```suggestion
from pyiceberg.partitioning import validate_partition_name
for spec in self._transaction.table_metadata.partition_specs:
for partition_field in spec.fields:
validate_partition_name(
partition_field.name, partition_field.transform,
partition_field.source_id, new_schema
)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]