HonahX commented on code in PR #498:
URL: https://github.com/apache/iceberg-python/pull/498#discussion_r1536944882


##########
pyiceberg/table/metadata.py:
##########
@@ -366,6 +366,11 @@ def construct_partition_specs(cls, data: Dict[str, Any]) 
-> Dict[str, Any]:
                 fields = data[PARTITION_SPEC]
                 data[PARTITION_SPECS] = [{SPEC_ID: INITIAL_SPEC_ID, FIELDS: 
fields}]
                 data[DEFAULT_SPEC_ID] = INITIAL_SPEC_ID
+            elif data.get("partition_spec") is not None:

Review Comment:
   This is also included in https://github.com/apache/iceberg-python/pull/544



##########
pyiceberg/catalog/__init__.py:
##########
@@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str):
     def _load_file_io(self, properties: Properties = EMPTY_DICT, location: 
Optional[str] = None) -> FileIO:
         return load_file_io({**self.properties, **properties}, location)
 
+    def _create_staged_table(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Union[Schema, "pa.Schema"],
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> StagedTable:
+        """Create a table and return the table instance without committing the 
changes.
+
+        Args:
+            identifier (str | Identifier): Table identifier.
+            schema (Schema): Table's schema.
+            location (str | None): Location for the table. Optional Argument.
+            partition_spec (PartitionSpec): PartitionSpec for the table.
+            sort_order (SortOrder): SortOrder for the table.
+            properties (Properties): Table properties that can be a string 
based dictionary.
+
+        Returns:
+            Table: the created table instance.
+
+        Raises:
+            TableAlreadyExistsError: If a table with the name already exists.
+        """
+        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
+
+        database_name, table_name = 
self.identifier_to_database_and_table(identifier)
+
+        location = self._resolve_table_location(location, database_name, 
table_name)
+        metadata_location = self._get_metadata_location(location=location)
+        metadata = new_table_metadata(
+            location=location, schema=schema, partition_spec=partition_spec, 
sort_order=sort_order, properties=properties
+        )
+        io = load_file_io(properties=self.properties, 
location=metadata_location)
+        return StagedTable(
+            identifier=(self.name, database_name, table_name),
+            metadata=metadata,
+            metadata_location=metadata_location,
+            io=io,
+            catalog=self,
+        )
+
+    def create_table_transaction(

Review Comment:
   While I don't have a strong preference, Ithink the name 
`create_table_transaction` clearly indicates that the API returns a 
"Transaction." This naming choice becomes particularly relevant when we 
consider potential usage scenarios. For example:
   ```python
   with catalog.create_table_transaction(
       identifier="docs_example.bids",
       schema=schema,
       location="s3://pyiceberg",
       partition_spec=partition_spec,
       sort_order=sort_order,
   ) as txn:
       with txn.update_schema() as update_schema:
           update_schema.add_column(path="new_column", field_type=StringType())
   
       with txn.update_spec() as update_spec:
           update_spec.add_identity("symbol")
   
       txn.set_properties(test_a="test_aa", test_b="test_b", test_c="test_c")
   ```
   Given this context, I'm inclined to keep the name as it is for now. However, 
I'm completely open to further discussion and would love to hear any thoughts 
or suggestions on this matter.
   



##########
pyiceberg/catalog/glue.py:
##########
@@ -435,46 +439,69 @@ def _commit_table(self, table_request: 
CommitTableRequest) -> CommitTableRespons
         )
         database_name, table_name = 
self.identifier_to_database_and_table(identifier_tuple)
 
-        current_glue_table = self._get_glue_table(database_name=database_name, 
table_name=table_name)
-        glue_table_version_id = current_glue_table.get("VersionId")
-        if not glue_table_version_id:
-            raise CommitFailedException(f"Cannot commit 
{database_name}.{table_name} because Glue table version id is missing")
-        current_table = 
self._convert_glue_to_iceberg(glue_table=current_glue_table)
-        base_metadata = current_table.metadata
-
-        # Validate the update requirements
-        for requirement in table_request.requirements:
-            requirement.validate(base_metadata)
-
-        updated_metadata = update_table_metadata(base_metadata, 
table_request.updates)
-        if updated_metadata == base_metadata:
-            # no changes, do nothing
-            return CommitTableResponse(metadata=base_metadata, 
metadata_location=current_table.metadata_location)
-
-        # write new metadata
-        new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1
-        new_metadata_location = 
self._get_metadata_location(current_table.metadata.location, 
new_metadata_version)
-        self._write_metadata(updated_metadata, current_table.io, 
new_metadata_location)
-
-        update_table_input = _construct_table_input(
-            table_name=table_name,
-            metadata_location=new_metadata_location,
-            properties=current_table.properties,
-            metadata=updated_metadata,
-            glue_table=current_glue_table,
-            prev_metadata_location=current_table.metadata_location,
-        )
+        try:
+            current_glue_table = 
self._get_glue_table(database_name=database_name, table_name=table_name)
+            # Update the table
+            glue_table_version_id = current_glue_table.get("VersionId")
+            if not glue_table_version_id:
+                raise CommitFailedException(
+                    f"Cannot commit {database_name}.{table_name} because Glue 
table version id is missing"
+                )
+            current_table = 
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+            base_metadata = current_table.metadata
+
+            # Validate the update requirements
+            for requirement in table_request.requirements:
+                requirement.validate(base_metadata)
+
+            updated_metadata = update_table_metadata(base_metadata, 
table_request.updates)
+            if updated_metadata == base_metadata:
+                # no changes, do nothing
+                return CommitTableResponse(metadata=base_metadata, 
metadata_location=current_table.metadata_location)
+
+            # write new metadata
+            new_metadata_version = 
self._parse_metadata_version(current_table.metadata_location) + 1
+            new_metadata_location = 
self._get_metadata_location(current_table.metadata.location, 
new_metadata_version)
+            self._write_metadata(updated_metadata, current_table.io, 
new_metadata_location)
+
+            update_table_input = _construct_table_input(
+                table_name=table_name,
+                metadata_location=new_metadata_location,
+                properties=current_table.properties,
+                metadata=updated_metadata,
+                glue_table=current_glue_table,
+                prev_metadata_location=current_table.metadata_location,
+            )
 
-        # Pass `version_id` to implement optimistic locking: it ensures 
updates are rejected if concurrent
-        # modifications occur. See more details at 
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
-        self._update_glue_table(
-            database_name=database_name,
-            table_name=table_name,
-            table_input=update_table_input,
-            version_id=glue_table_version_id,
-        )
+            # Pass `version_id` to implement optimistic locking: it ensures 
updates are rejected if concurrent
+            # modifications occur. See more details at 
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
+            self._update_glue_table(
+                database_name=database_name,
+                table_name=table_name,
+                table_input=update_table_input,
+                version_id=glue_table_version_id,
+            )
+
+            return CommitTableResponse(metadata=updated_metadata, 
metadata_location=new_metadata_location)
+        except NoSuchTableError:
+            # Create the table
+            updated_metadata = construct_table_metadata(table_request.updates)

Review Comment:
   I was thinking of it, and my rationale for the current implementation 
centers around ensuring a uniform transaction creation and commit process for 
both RestCatalogs and other types of catalogs. Specifically, for RestCatalogs, 
it's required to initiate CreateTableTransaction with 
`_create_table(staged_create=True)` and to use `_commit_table` with both 
initial and subsequent updates during transaction commitment. On the other 
hand, alternative catalogs offer more flexibility, allowing for either the use 
of` _commit_table` to reconstruct table metadata upon commitment or a modified` 
_create_table` API to create table during the transaction commitment.
   
   Considering pyiceberg's alignment with Rest API principles, where` 
_commit_table` aggregates metadata updates to construct the revised metadata 
for table updates within a transaction, it seems prudent to maintain 
consistency with Rest API practices for table creation within transactions. 
This approach simplifies the process by relying on `_commit_table` to generate 
and commit metadata from scratch, eliminating the need to distinguish between 
RestCatalogs and other catalog types during transaction commitments.
   
   Additionally, I've noted that the existing create_table and 
new_table_metadata APIs lack support for initializing metadata with snapshot 
information. I think that responsibility should belong to `AddSnapshotUpdate` 
and `update_table_metadata`. Thus, I've opted to maintain the current approach 
of utilizing ` _commit_table` for both functions. 
   
   Does this approach sound reasonable to you? Please feel free to correct me 
if I've misunderstood any aspect of this process. Thanks for your input!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to