HonahX commented on code in PR #139: URL: https://github.com/apache/iceberg-python/pull/139#discussion_r1401649802
########## pyiceberg/table/__init__.py: ########## @@ -350,6 +357,241 @@ class RemovePropertiesUpdate(TableUpdate): removals: List[str] +class TableMetadataUpdateContext: + updates: List[TableUpdate] + last_added_schema_id: Optional[int] + + def __init__(self) -> None: + self.updates = [] + self.last_added_schema_id = None + + def is_added_snapshot(self, snapshot_id: int) -> bool: + return any( + update.snapshot.snapshot_id == snapshot_id + for update in self.updates + if update.action == TableUpdateAction.add_snapshot + ) + + def is_added_schema(self, schema_id: int) -> bool: + return any( + update.schema_.schema_id == schema_id for update in self.updates if update.action == TableUpdateAction.add_schema + ) + + +@singledispatch +def apply_table_update(update: TableUpdate, base_metadata: TableMetadata, context: TableMetadataUpdateContext) -> TableMetadata: + """Apply a table update to the table metadata. + + Args: + update: The update to be applied. + base_metadata: The base metadata to be updated. + context: Contains previous updates, last_added_snapshot_id and other change tracking information in the current transaction. + + Returns: + The updated metadata. + + """ + raise NotImplementedError(f"Unsupported table update: {update}") + + +@apply_table_update.register(UpgradeFormatVersionUpdate) +def _(update: UpgradeFormatVersionUpdate, base_metadata: TableMetadata, context: TableMetadataUpdateContext) -> TableMetadata: + if update.format_version > SUPPORTED_TABLE_FORMAT_VERSION: + raise ValueError(f"Unsupported table format version: {update.format_version}") + + if update.format_version < base_metadata.format_version: + raise ValueError(f"Cannot downgrade v{base_metadata.format_version} table to v{update.format_version}") + + if update.format_version == base_metadata.format_version: + return base_metadata + + updated_metadata_data = copy(base_metadata.model_dump()) + updated_metadata_data["format-version"] = update.format_version + + context.updates.append(update) + return TableMetadataUtil.parse_obj(updated_metadata_data) + + +@apply_table_update.register(AddSchemaUpdate) +def _(update: AddSchemaUpdate, base_metadata: TableMetadata, context: TableMetadataUpdateContext) -> TableMetadata: + def reuse_or_create_new_schema_id(new_schema: Schema) -> Tuple[int, bool]: + """Reuse schema id if schema already exists, otherwise create a new one. + + Args: + new_schema: The new schema to be added. + + Returns: + The new schema id and whether the schema already exists. + """ + result_schema_id = base_metadata.current_schema_id + for schema in base_metadata.schemas: + if schema == new_schema: + return schema.schema_id, True + elif schema.schema_id >= result_schema_id: + result_schema_id = schema.schema_id + 1 + return result_schema_id, False + + if update.last_column_id < base_metadata.last_column_id: + raise ValueError(f"Invalid last column id {update.last_column_id}, must be >= {base_metadata.last_column_id}") + + new_schema_id, schema_found = reuse_or_create_new_schema_id(update.schema_) + if schema_found and update.last_column_id == base_metadata.last_column_id: + if context.last_added_schema_id is not None and context.is_added_schema(new_schema_id): + context.last_added_schema_id = new_schema_id + return base_metadata + + updated_metadata_data = copy(base_metadata.model_dump()) + updated_metadata_data["last-column-id"] = update.last_column_id + + new_schema = ( + update.schema_ + if new_schema_id == update.schema_.schema_id + else Schema(*update.schema_.fields, schema_id=new_schema_id, identifier_field_ids=update.schema_.identifier_field_ids) + ) + + if not schema_found: + updated_metadata_data["schemas"].append(new_schema.model_dump()) + + context.updates.append(update) + context.last_added_schema_id = new_schema_id + return TableMetadataUtil.parse_obj(updated_metadata_data) + + +@apply_table_update.register(SetCurrentSchemaUpdate) +def _(update: SetCurrentSchemaUpdate, base_metadata: TableMetadata, context: TableMetadataUpdateContext) -> TableMetadata: + if update.schema_id == -1: + if context.last_added_schema_id is None: + raise ValueError("Cannot set current schema to last added schema when no schema has been added") + return apply_table_update(SetCurrentSchemaUpdate(schema_id=context.last_added_schema_id), base_metadata, context) + + if update.schema_id == base_metadata.current_schema_id: + return base_metadata + + schema = base_metadata.schemas_by_id.get(update.schema_id) + if schema is None: + raise ValueError(f"Schema with id {update.schema_id} does not exist") + + updated_metadata_data = copy(base_metadata.model_dump()) + updated_metadata_data["current-schema-id"] = update.schema_id + + if context.last_added_schema_id is not None and context.last_added_schema_id == update.schema_id: + context.updates.append(SetCurrentSchemaUpdate(schema_id=-1)) + else: + context.updates.append(update) + + return TableMetadataUtil.parse_obj(updated_metadata_data) + + +@apply_table_update.register(AddSnapshotUpdate) +def _(update: AddSnapshotUpdate, base_metadata: TableMetadata, context: TableMetadataUpdateContext) -> TableMetadata: + if len(base_metadata.schemas) == 0: + raise ValueError("Attempting to add a snapshot before a schema is added") + + if len(base_metadata.partition_specs) == 0: + raise ValueError("Attempting to add a snapshot before a partition spec is added") + + if len(base_metadata.sort_orders) == 0: + raise ValueError("Attempting to add a snapshot before a sort order is added") + + if base_metadata.snapshots_by_id.get(update.snapshot.snapshot_id) is not None: + raise ValueError(f"Snapshot with id {update.snapshot.snapshot_id} already exists") + + if ( + base_metadata.format_version == 2 + and update.snapshot.sequence_number is not None + and update.snapshot.sequence_number <= base_metadata.last_sequence_number + and update.snapshot.parent_snapshot_id is not None + ): + raise ValueError( + f"Cannot add snapshot with sequence number {update.snapshot.sequence_number} " + f"older than last sequence number {base_metadata.last_sequence_number}" + ) + + updated_metadata_data = copy(base_metadata.model_dump()) + updated_metadata_data["last-updated-ms"] = update.snapshot.timestamp_ms + updated_metadata_data["last-sequence-number"] = update.snapshot.sequence_number + updated_metadata_data["snapshots"].append(update.snapshot.model_dump()) + context.updates.append(update) + return TableMetadataUtil.parse_obj(updated_metadata_data) + + +@apply_table_update.register(SetSnapshotRefUpdate) +def _(update: SetSnapshotRefUpdate, base_metadata: TableMetadata, context: TableMetadataUpdateContext) -> TableMetadata: + if update.type is None: + raise ValueError("Snapshot ref type must be set") + + if update.min_snapshots_to_keep is not None and update.type == SnapshotRefType.TAG: + raise ValueError("Cannot set min snapshots to keep for branch refs") + + if update.min_snapshots_to_keep is not None and update.min_snapshots_to_keep <= 0: + raise ValueError("Minimum snapshots to keep must be >= 0") + + if update.max_snapshot_age_ms is not None and update.type == SnapshotRefType.TAG: + raise ValueError("Tags do not support setting maxSnapshotAgeMs") + + if update.max_snapshot_age_ms is not None and update.max_snapshot_age_ms <= 0: + raise ValueError("Max snapshot age must be > 0 ms") + + if update.max_ref_age_ms is not None and update.max_ref_age_ms <= 0: + raise ValueError("Max ref age must be > 0 ms") Review Comment: Thanks for the suggestion! I moved all the validations to `SnapshotRef` model in `refs.py`. (Pydantic is great!) There is one weird thing I observed: I first tried to construct an optional int field in the way suggested by [pydantic numeric constraints](https://docs.pydantic.dev/latest/concepts/fields/#numeric-constraints): ```python min_snapshots_to_keep: Optional[Annotated[int, Field(alias="min-snapshots-to-keep", default=None, gt=0)]] ``` However this will make `min_snapshots_to_keep` a required field since I get validation error when constructing SnapshotRef instance without this field Later I found that defining the type like this: ```python min_snapshots_to_keep: Annotated[Optional[int], Field(alias="min-snapshots-to-keep", default=None, gt=0)] ``` could make everything work as expected. I've added some tests in `test_refs.py` for this change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org