HonahX commented on code in PR #498: URL: https://github.com/apache/iceberg-python/pull/498#discussion_r1536944882
########## pyiceberg/table/metadata.py: ########## @@ -366,6 +366,11 @@ def construct_partition_specs(cls, data: Dict[str, Any]) -> Dict[str, Any]: fields = data[PARTITION_SPEC] data[PARTITION_SPECS] = [{SPEC_ID: INITIAL_SPEC_ID, FIELDS: fields}] data[DEFAULT_SPEC_ID] = INITIAL_SPEC_ID + elif data.get("partition_spec") is not None: Review Comment: This is also included in https://github.com/apache/iceberg-python/pull/544 ########## pyiceberg/catalog/__init__.py: ########## @@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str): def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: return load_file_io({**self.properties, **properties}, location) + def _create_staged_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> StagedTable: + """Create a table and return the table instance without committing the changes. + + Args: + identifier (str | Identifier): Table identifier. + schema (Schema): Table's schema. + location (str | None): Location for the table. Optional Argument. + partition_spec (PartitionSpec): PartitionSpec for the table. + sort_order (SortOrder): SortOrder for the table. + properties (Properties): Table properties that can be a string based dictionary. + + Returns: + Table: the created table instance. + + Raises: + TableAlreadyExistsError: If a table with the name already exists. + """ + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + + database_name, table_name = self.identifier_to_database_and_table(identifier) + + location = self._resolve_table_location(location, database_name, table_name) + metadata_location = self._get_metadata_location(location=location) + metadata = new_table_metadata( + location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + ) + io = load_file_io(properties=self.properties, location=metadata_location) + return StagedTable( + identifier=(self.name, database_name, table_name), + metadata=metadata, + metadata_location=metadata_location, + io=io, + catalog=self, + ) + + def create_table_transaction( Review Comment: While I don't have a strong preference, Ithink the name `create_table_transaction` clearly indicates that the API returns a "Transaction." This naming choice becomes particularly relevant when we consider potential usage scenarios. For example: ```python with catalog.create_table_transaction( identifier="docs_example.bids", schema=schema, location="s3://pyiceberg", partition_spec=partition_spec, sort_order=sort_order, ) as txn: with txn.update_schema() as update_schema: update_schema.add_column(path="new_column", field_type=StringType()) with txn.update_spec() as update_spec: update_spec.add_identity("symbol") txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c") ``` Given this context, I'm inclined to keep the name as it is for now. However, I'm completely open to further discussion and would love to hear any thoughts or suggestions on this matter. ########## pyiceberg/catalog/glue.py: ########## @@ -435,46 +439,69 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) - current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) - glue_table_version_id = current_glue_table.get("VersionId") - if not glue_table_version_id: - raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing") - current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) - base_metadata = current_table.metadata - - # Validate the update requirements - for requirement in table_request.requirements: - requirement.validate(base_metadata) - - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) - - update_table_input = _construct_table_input( - table_name=table_name, - metadata_location=new_metadata_location, - properties=current_table.properties, - metadata=updated_metadata, - glue_table=current_glue_table, - prev_metadata_location=current_table.metadata_location, - ) + try: + current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) + # Update the table + glue_table_version_id = current_glue_table.get("VersionId") + if not glue_table_version_id: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" + ) + current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) + base_metadata = current_table.metadata + + # Validate the update requirements + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + update_table_input = _construct_table_input( + table_name=table_name, + metadata_location=new_metadata_location, + properties=current_table.properties, + metadata=updated_metadata, + glue_table=current_glue_table, + prev_metadata_location=current_table.metadata_location, + ) - # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent - # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking - self._update_glue_table( - database_name=database_name, - table_name=table_name, - table_input=update_table_input, - version_id=glue_table_version_id, - ) + # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent + # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking + self._update_glue_table( + database_name=database_name, + table_name=table_name, + table_input=update_table_input, + version_id=glue_table_version_id, + ) + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + except NoSuchTableError: + # Create the table + updated_metadata = construct_table_metadata(table_request.updates) Review Comment: I was thinking of it, and my rationale for the current implementation centers around ensuring a uniform transaction creation and commit process for both RestCatalogs and other types of catalogs. Specifically, for RestCatalogs, it's required to initiate CreateTableTransaction with `_create_table(staged_create=True)` and to use `_commit_table` with both initial and subsequent updates during transaction commitment. On the other hand, alternative catalogs offer more flexibility, allowing for either the use of` _commit_table` to reconstruct table metadata upon commitment or a modified` _create_table` API to create table during the transaction commitment. Considering pyiceberg's alignment with Rest API principles, where` _commit_table` aggregates metadata updates to construct the revised metadata for table updates within a transaction, it seems prudent to maintain consistency with Rest API practices for table creation within transactions. This approach simplifies the process by relying on `_commit_table` to generate and commit metadata from scratch, eliminating the need to distinguish between RestCatalogs and other catalog types during transaction commitments. Additionally, I've noted that the existing create_table and new_table_metadata APIs lack support for initializing metadata with snapshot information. I think that responsibility should belong to `AddSnapshotUpdate` and `update_table_metadata`. Thus, I've opted to maintain the current approach of utilizing ` _commit_table` for both functions. Does this approach sound reasonable to you? Please feel free to correct me if I've misunderstood any aspect of this process. Thanks for your input! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org