Fokko commented on code in PR #498:
URL: https://github.com/apache/iceberg-python/pull/498#discussion_r1524665622


##########
pyiceberg/table/__init__.py:
##########
@@ -620,15 +711,17 @@ def _(update: AddPartitionSpecUpdate, base_metadata: 
TableMetadata, context: _Ta
         update={
             "partition_specs": base_metadata.partition_specs + [update.spec],
             "last_partition_id": max(
-                max(field.field_id for field in update.spec.fields),
+                max([field.field_id for field in update.spec.fields] + [0]),

Review Comment:
   Avoids a list concatenation :)
   ```suggestion
                   max(0, *[field.field_id for field in update.spec.fields]),
   ```



##########
pyiceberg/catalog/glue.py:
##########
@@ -435,46 +439,69 @@ def _commit_table(self, table_request: 
CommitTableRequest) -> CommitTableRespons
         )
         database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple)
 
-        current_glue_table = self._get_glue_table(database_name=database_name, 
table_name=table_name)
-        glue_table_version_id = current_glue_table.get("VersionId")
-        if not glue_table_version_id:
-            raise CommitFailedException(f"Cannot commit 
{database_name}.{table_name} because Glue table version id is missing")
-        current_table = 
self._convert_glue_to_iceberg(glue_table=current_glue_table)
-        base_metadata = current_table.metadata
-
-        # Validate the update requirements
-        for requirement in table_request.requirements:
-            requirement.validate(base_metadata)
-
-        updated_metadata = update_table_metadata(base_metadata, 
table_request.updates)
-        if updated_metadata == base_metadata:
-            # no changes, do nothing
-            return CommitTableResponse(metadata=base_metadata, 
metadata_location=current_table.metadata_location)
-
-        # write new metadata
-        new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1
-        new_metadata_location = 
self._get_metadata_location(current_table.metadata.location, 
new_metadata_version)
-        self._write_metadata(updated_metadata, current_table.io, 
new_metadata_location)
-
-        update_table_input = _construct_table_input(
-            table_name=table_name,
-            metadata_location=new_metadata_location,
-            properties=current_table.properties,
-            metadata=updated_metadata,
-            glue_table=current_glue_table,
-            prev_metadata_location=current_table.metadata_location,
-        )
+        try:
+            current_glue_table = 
self._get_glue_table(database_name=database_name, table_name=table_name)
+            # Update the table
+            glue_table_version_id = current_glue_table.get("VersionId")
+            if not glue_table_version_id:
+                raise CommitFailedException(
+                    f"Cannot commit {database_name}.{table_name} because Glue 
table version id is missing"
+                )
+            current_table = 
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+            base_metadata = current_table.metadata
+
+            # Validate the update requirements
+            for requirement in table_request.requirements:
+                requirement.validate(base_metadata)
+
+            updated_metadata = update_table_metadata(base_metadata, 
table_request.updates)
+            if updated_metadata == base_metadata:
+                # no changes, do nothing
+                return CommitTableResponse(metadata=base_metadata, 
metadata_location=current_table.metadata_location)
+
+            # write new metadata
+            new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1
+            new_metadata_location = 
self._get_metadata_location(current_table.metadata.location, 
new_metadata_version)
+            self._write_metadata(updated_metadata, current_table.io, 
new_metadata_location)
+
+            update_table_input = _construct_table_input(
+                table_name=table_name,
+                metadata_location=new_metadata_location,
+                properties=current_table.properties,
+                metadata=updated_metadata,
+                glue_table=current_glue_table,
+                prev_metadata_location=current_table.metadata_location,
+            )
 
-        # Pass `version_id` to implement optimistic locking: it ensures 
updates are rejected if concurrent
-        # modifications occur. See more details at 
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
-        self._update_glue_table(
-            database_name=database_name,
-            table_name=table_name,
-            table_input=update_table_input,
-            version_id=glue_table_version_id,
-        )
+            # Pass `version_id` to implement optimistic locking: it ensures 
updates are rejected if concurrent
+            # modifications occur. See more details at 
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
+            self._update_glue_table(
+                database_name=database_name,
+                table_name=table_name,
+                table_input=update_table_input,
+                version_id=glue_table_version_id,
+            )
+
+            return CommitTableResponse(metadata=updated_metadata, 
metadata_location=new_metadata_location)
+        except NoSuchTableError:
+            # Create the table
+            updated_metadata = construct_table_metadata(table_request.updates)

Review Comment:
   I would expect `create_table` to be called from the `_commit` from the 
`Transaction`. 



##########
pyiceberg/catalog/__init__.py:
##########
@@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str):
     def _load_file_io(self, properties: Properties = EMPTY_DICT, location: 
Optional[str] = None) -> FileIO:
         return load_file_io({**self.properties, **properties}, location)
 
+    def _create_staged_table(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Union[Schema, "pa.Schema"],
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> StagedTable:
+        """Create a table and return the table instance without committing the 
changes.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): Table's schema.
+            location (str | None): Location for the table. Optional Argument.
+            partition_spec (PartitionSpec): PartitionSpec for the table.
+            sort_order (SortOrder): SortOrder for the table.
+            properties (Properties): Table properties that can be a string 
based dictionary.
+
+        Returns:
+            Table: the created table instance.
+
+        Raises:
+            TableAlreadyExistsError: If a table with the name already exists.
+        """
+        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
+
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
+
+        location = self._resolve_table_location(location, database_name, 
table_name)
+        metadata_location = self._get_metadata_location(location=location)
+        metadata = new_table_metadata(
+            location=location, schema=schema, partition_spec=partition_spec, 
sort_order=sort_order, properties=properties
+        )
+        io = load_file_io(properties=self.properties, 
location=metadata_location)
+        return StagedTable(
+            identifier=(self.name, database_name, table_name),
+            metadata=metadata,
+            metadata_location=metadata_location,
+            io=io,
+            catalog=self,
+        )
+
+    def create_table_transaction(

Review Comment:
   WDYT?
   ```suggestion
       def create_table_staged(
   ```



##########
pyiceberg/table/metadata.py:
##########
@@ -287,6 +287,31 @@ def _generate_snapshot_id() -> int:
     return snapshot_id
 
 
+class IncompleteTableMetadata(TableMetadataCommonFields):

Review Comment:
   Do you know if we need this class? I don't like that we must update the 
return signature when updating the table metadata to `TableMetadata | 
IncompleteTableMetadata`. While the name looks very similar, the classes have 
no OOP relationship. When creating a table we need an UUID (which we can 
generate), location, and an initial schema. Looking at the tests, we have all 
of those.



##########
pyiceberg/catalog/__init__.py:
##########
@@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str):
     def _load_file_io(self, properties: Properties = EMPTY_DICT, location: 
Optional[str] = None) -> FileIO:
         return load_file_io({**self.properties, **properties}, location)
 
+    def _create_staged_table(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Union[Schema, "pa.Schema"],
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> StagedTable:
+        """Create a table and return the table instance without committing the 
changes.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): Table's schema.
+            location (str | None): Location for the table. Optional Argument.
+            partition_spec (PartitionSpec): PartitionSpec for the table.
+            sort_order (SortOrder): SortOrder for the table.
+            properties (Properties): Table properties that can be a string 
based dictionary.
+
+        Returns:
+            Table: the created table instance.
+
+        Raises:
+            TableAlreadyExistsError: If a table with the name already exists.
+        """
+        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
+
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
+
+        location = self._resolve_table_location(location, database_name, 
table_name)
+        metadata_location = self._get_metadata_location(location=location)
+        metadata = new_table_metadata(
+            location=location, schema=schema, partition_spec=partition_spec, 
sort_order=sort_order, properties=properties
+        )
+        io = load_file_io(properties=self.properties, 
location=metadata_location)
+        return StagedTable(
+            identifier=(self.name, database_name, table_name),
+            metadata=metadata,
+            metadata_location=metadata_location,
+            io=io,
+            catalog=self,
+        )
+
+    def create_table_transaction(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Union[Schema, "pa.Schema"],
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> CreateTableTransaction:
+        """Create a CreateTableTransaction.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): Table's schema.
+            location (str | None): Location for the table. Optional Argument.
+            partition_spec (PartitionSpec): PartitionSpec for the table.
+            sort_order (SortOrder): SortOrder for the table.
+            properties (Properties): Table properties that can be a string 
based dictionary.
+
+        Returns:
+            CreateTableTransaction: createTableTransaction instance.
+
+        Raises:
+            TableAlreadyExistsError: If a table with the name already exists.
+        """
+        return CreateTableTransaction(
+            self._create_staged_table(identifier, schema, location, 
partition_spec, sort_order, properties)

Review Comment:
   Style nit: I would just inline this function, avoids scrolling :)



##########
tests/integration/test_writes.py:
##########
@@ -632,6 +632,60 @@ def test_write_and_evolve(session_catalog: Catalog, 
format_version: int) -> None
                 snapshot_update.append_data_file(data_file)
 
 
+@pytest.mark.integration
+@pytest.mark.parametrize("format_version", [2])
+def test_create_table_transaction(session_catalog: Catalog, format_version: 
int) -> None:
+    if format_version == 1:
+        pytest.skip(
+            "There is a bug in the REST catalog (maybe server side) that 
prevents create and commit a staged version 1 table"

Review Comment:
   Interesting, let me track this down.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to