Fokko commented on code in PR #498: URL: https://github.com/apache/iceberg-python/pull/498#discussion_r1524665622
########## pyiceberg/table/__init__.py: ########## @@ -620,15 +711,17 @@ def _(update: AddPartitionSpecUpdate, base_metadata: TableMetadata, context: _Ta update={ "partition_specs": base_metadata.partition_specs + [update.spec], "last_partition_id": max( - max(field.field_id for field in update.spec.fields), + max([field.field_id for field in update.spec.fields] + [0]), Review Comment: Avoids a list concatenation :) ```suggestion max(0, *[field.field_id for field in update.spec.fields]), ``` ########## pyiceberg/catalog/glue.py: ########## @@ -435,46 +439,69 @@ def _commit_table(self, table_request: CommitTableRequest) -> CommitTableRespons ) database_name, table_name = self.identifier_to_database_and_table(identifier_tuple) - current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) - glue_table_version_id = current_glue_table.get("VersionId") - if not glue_table_version_id: - raise CommitFailedException(f"Cannot commit {database_name}.{table_name} because Glue table version id is missing") - current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) - base_metadata = current_table.metadata - - # Validate the update requirements - for requirement in table_request.requirements: - requirement.validate(base_metadata) - - updated_metadata = update_table_metadata(base_metadata, table_request.updates) - if updated_metadata == base_metadata: - # no changes, do nothing - return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) - - # write new metadata - new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 - new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) - self._write_metadata(updated_metadata, current_table.io, new_metadata_location) - - update_table_input = _construct_table_input( - table_name=table_name, - metadata_location=new_metadata_location, - properties=current_table.properties, - metadata=updated_metadata, - glue_table=current_glue_table, - prev_metadata_location=current_table.metadata_location, - ) + try: + current_glue_table = self._get_glue_table(database_name=database_name, table_name=table_name) + # Update the table + glue_table_version_id = current_glue_table.get("VersionId") + if not glue_table_version_id: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because Glue table version id is missing" + ) + current_table = self._convert_glue_to_iceberg(glue_table=current_glue_table) + base_metadata = current_table.metadata + + # Validate the update requirements + for requirement in table_request.requirements: + requirement.validate(base_metadata) + + updated_metadata = update_table_metadata(base_metadata, table_request.updates) + if updated_metadata == base_metadata: + # no changes, do nothing + return CommitTableResponse(metadata=base_metadata, metadata_location=current_table.metadata_location) + + # write new metadata + new_metadata_version = self._parse_metadata_version(current_table.metadata_location) + 1 + new_metadata_location = self._get_metadata_location(current_table.metadata.location, new_metadata_version) + self._write_metadata(updated_metadata, current_table.io, new_metadata_location) + + update_table_input = _construct_table_input( + table_name=table_name, + metadata_location=new_metadata_location, + properties=current_table.properties, + metadata=updated_metadata, + glue_table=current_glue_table, + prev_metadata_location=current_table.metadata_location, + ) - # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent - # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking - self._update_glue_table( - database_name=database_name, - table_name=table_name, - table_input=update_table_input, - version_id=glue_table_version_id, - ) + # Pass `version_id` to implement optimistic locking: it ensures updates are rejected if concurrent + # modifications occur. See more details at https://iceberg.apache.org/docs/latest/aws/#optimistic-locking + self._update_glue_table( + database_name=database_name, + table_name=table_name, + table_input=update_table_input, + version_id=glue_table_version_id, + ) + + return CommitTableResponse(metadata=updated_metadata, metadata_location=new_metadata_location) + except NoSuchTableError: + # Create the table + updated_metadata = construct_table_metadata(table_request.updates) Review Comment: I would expect `create_table` to be called from the `_commit` from the `Transaction`. ########## pyiceberg/catalog/__init__.py: ########## @@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str): def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: return load_file_io({**self.properties, **properties}, location) + def _create_staged_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> StagedTable: + """Create a table and return the table instance without committing the changes. + + Args: + identifier (str | Identifier): Table identifier. + schema (Schema): Table's schema. + location (str | None): Location for the table. Optional Argument. + partition_spec (PartitionSpec): PartitionSpec for the table. + sort_order (SortOrder): SortOrder for the table. + properties (Properties): Table properties that can be a string based dictionary. + + Returns: + Table: the created table instance. + + Raises: + TableAlreadyExistsError: If a table with the name already exists. + """ + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + + database_name, table_name = self.identifier_to_database_and_table(identifier) + + location = self._resolve_table_location(location, database_name, table_name) + metadata_location = self._get_metadata_location(location=location) + metadata = new_table_metadata( + location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + ) + io = load_file_io(properties=self.properties, location=metadata_location) + return StagedTable( + identifier=(self.name, database_name, table_name), + metadata=metadata, + metadata_location=metadata_location, + io=io, + catalog=self, + ) + + def create_table_transaction( Review Comment: WDYT? ```suggestion def create_table_staged( ``` ########## pyiceberg/table/metadata.py: ########## @@ -287,6 +287,31 @@ def _generate_snapshot_id() -> int: return snapshot_id +class IncompleteTableMetadata(TableMetadataCommonFields): Review Comment: Do you know if we need this class? I don't like that we must update the return signature when updating the table metadata to `TableMetadata | IncompleteTableMetadata`. While the name looks very similar, the classes have no OOP relationship. When creating a table we need an UUID (which we can generate), location, and an initial schema. Looking at the tests, we have all of those. ########## pyiceberg/catalog/__init__.py: ########## @@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str): def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: return load_file_io({**self.properties, **properties}, location) + def _create_staged_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> StagedTable: + """Create a table and return the table instance without committing the changes. + + Args: + identifier (str | Identifier): Table identifier. + schema (Schema): Table's schema. + location (str | None): Location for the table. Optional Argument. + partition_spec (PartitionSpec): PartitionSpec for the table. + sort_order (SortOrder): SortOrder for the table. + properties (Properties): Table properties that can be a string based dictionary. + + Returns: + Table: the created table instance. + + Raises: + TableAlreadyExistsError: If a table with the name already exists. + """ + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + + database_name, table_name = self.identifier_to_database_and_table(identifier) + + location = self._resolve_table_location(location, database_name, table_name) + metadata_location = self._get_metadata_location(location=location) + metadata = new_table_metadata( + location=location, schema=schema, partition_spec=partition_spec, sort_order=sort_order, properties=properties + ) + io = load_file_io(properties=self.properties, location=metadata_location) + return StagedTable( + identifier=(self.name, database_name, table_name), + metadata=metadata, + metadata_location=metadata_location, + io=io, + catalog=self, + ) + + def create_table_transaction( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> CreateTableTransaction: + """Create a CreateTableTransaction. + + Args: + identifier (str | Identifier): Table identifier. + schema (Schema): Table's schema. + location (str | None): Location for the table. Optional Argument. + partition_spec (PartitionSpec): PartitionSpec for the table. + sort_order (SortOrder): SortOrder for the table. + properties (Properties): Table properties that can be a string based dictionary. + + Returns: + CreateTableTransaction: createTableTransaction instance. + + Raises: + TableAlreadyExistsError: If a table with the name already exists. + """ + return CreateTableTransaction( + self._create_staged_table(identifier, schema, location, partition_spec, sort_order, properties) Review Comment: Style nit: I would just inline this function, avoids scrolling :) ########## tests/integration/test_writes.py: ########## @@ -632,6 +632,60 @@ def test_write_and_evolve(session_catalog: Catalog, format_version: int) -> None snapshot_update.append_data_file(data_file) +@pytest.mark.integration +@pytest.mark.parametrize("format_version", [2]) +def test_create_table_transaction(session_catalog: Catalog, format_version: int) -> None: + if format_version == 1: + pytest.skip( + "There is a bug in the REST catalog (maybe server side) that prevents create and commit a staged version 1 table" Review Comment: Interesting, let me track this down. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org