Fokko commented on code in PR #139:
URL: https://github.com/apache/iceberg-python/pull/139#discussion_r1409365694


##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["format-version"] = update.format_version
+
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSchemaUpdate)
+def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    def reuse_or_create_new_schema_id(new_schema: Schema) -> Tuple[int, bool]:
+        """Reuse schema id if schema already exists, otherwise create a new 
one.
+
+        Args:
+            new_schema: The new schema to be added.
+
+        Returns:
+            The new schema id and whether the schema already exists.
+        """
+        result_schema_id = base_metadata.current_schema_id
+        for schema in base_metadata.schemas:
+            if schema == new_schema:
+                return schema.schema_id, True
+            elif schema.schema_id >= result_schema_id:
+                result_schema_id = schema.schema_id + 1
+        return result_schema_id, False
+
+    if update.last_column_id < base_metadata.last_column_id:
+        raise ValueError(f"Invalid last column id {update.last_column_id}, 
must be >= {base_metadata.last_column_id}")
+
+    new_schema_id, schema_found = reuse_or_create_new_schema_id(update.schema_)
+    if schema_found and update.last_column_id == base_metadata.last_column_id:
+        if context.last_added_schema_id is not None and 
context.is_added_schema(new_schema_id):
+            context.last_added_schema_id = new_schema_id
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-column-id"] = update.last_column_id
+
+    new_schema = (
+        update.schema_
+        if new_schema_id == update.schema_.schema_id
+        else Schema(*update.schema_.fields, schema_id=new_schema_id, 
identifier_field_ids=update.schema_.identifier_field_ids)
+    )

Review Comment:
   > Thanks for the explanation! Just to confirm my understanding, in 
pyiceberg, one transaction should only have one AddSchemaUpdate because all 
updates related to the schema should be accumulated via UpdateSchema.
   
   This is correct. Currently, the `Update` has to be unique, so stacking two 
updates will even raise a `ValueError`.
   
   > Hence, we can trust the schema_id in the update and do not need the logic 
to handle multiple schema added in one transaction.
   
   This is correct 👍 



##########
pyiceberg/table/__init__.py:
##########
@@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate):
     removals: List[str]
 
 
+class TableMetadataUpdateContext:
+    updates: List[TableUpdate]
+    last_added_schema_id: Optional[int]
+
+    def __init__(self) -> None:
+        self.updates = []
+        self.last_added_schema_id = None
+
+    def is_added_snapshot(self, snapshot_id: int) -> bool:
+        return any(
+            update.snapshot.snapshot_id == snapshot_id
+            for update in self.updates
+            if update.action == TableUpdateAction.add_snapshot
+        )
+
+    def is_added_schema(self, schema_id: int) -> bool:
+        return any(
+            update.schema_.schema_id == schema_id for update in self.updates 
if update.action == TableUpdateAction.add_schema
+        )
+
+
+@singledispatch
+def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    """Apply a table update to the table metadata.
+
+    Args:
+        update: The update to be applied.
+        base_metadata: The base metadata to be updated.
+        context: Contains previous updates, last_added_snapshot_id and other 
change tracking information in the current transaction.
+
+    Returns:
+        The updated metadata.
+
+    """
+    raise NotImplementedError(f"Unsupported table update: {update}")
+
+
+@apply_table_update.register(UpgradeFormatVersionUpdate)
+def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, 
context: TableMetadataUpdateContext) -> TableMetadata:
+    if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION:
+        raise ValueError(f"Unsupported table format version: 
{update.format_version}")
+
+    if update.format_version < base_metadata.format_version:
+        raise ValueError(f"Cannot downgrade v{base_metadata.format_version} 
table to v{update.format_version}")
+
+    if update.format_version == base_metadata.format_version:
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["format-version"] = update.format_version
+
+    context.updates.append(update)
+    return TableMetadataUtil.parse_obj(updated_metadata_data)
+
+
+@apply_table_update.register(AddSchemaUpdate)
+def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: 
TableMetadataUpdateContext) -> TableMetadata:
+    def reuse_or_create_new_schema_id(new_schema: Schema) -> Tuple[int, bool]:
+        """Reuse schema id if schema already exists, otherwise create a new 
one.
+
+        Args:
+            new_schema: The new schema to be added.
+
+        Returns:
+            The new schema id and whether the schema already exists.
+        """
+        result_schema_id = base_metadata.current_schema_id
+        for schema in base_metadata.schemas:
+            if schema == new_schema:
+                return schema.schema_id, True
+            elif schema.schema_id >= result_schema_id:
+                result_schema_id = schema.schema_id + 1
+        return result_schema_id, False
+
+    if update.last_column_id < base_metadata.last_column_id:
+        raise ValueError(f"Invalid last column id {update.last_column_id}, 
must be >= {base_metadata.last_column_id}")
+
+    new_schema_id, schema_found = reuse_or_create_new_schema_id(update.schema_)
+    if schema_found and update.last_column_id == base_metadata.last_column_id:
+        if context.last_added_schema_id is not None and 
context.is_added_schema(new_schema_id):
+            context.last_added_schema_id = new_schema_id
+        return base_metadata
+
+    updated_metadata_data = copy(base_metadata.model_dump())
+    updated_metadata_data["last-column-id"] = update.last_column_id
+
+    new_schema = (
+        update.schema_
+        if new_schema_id == update.schema_.schema_id
+        else Schema(*update.schema_.fields, schema_id=new_schema_id, 
identifier_field_ids=update.schema_.identifier_field_ids)
+    )

Review Comment:
   > Thanks for the explanation! Just to confirm my understanding, in 
pyiceberg, one transaction should only have one AddSchemaUpdate because all 
updates related to the schema should be accumulated via UpdateSchema.
   
   This is correct. Currently, the `Update` has to be unique, so stacking two 
updates will even raise a `ValueError`.
   
   > Hence, we can trust the schema_id in the update and do not need the logic 
to handle multiple schema added in one transaction.
   
   This is correct 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to