Fokko commented on code in PR #498:
URL: https://github.com/apache/iceberg-python/pull/498#discussion_r1524665622
##########
pyiceberg/table/__init__.py:
##########
@@ -620,15 +711,17 @@ def _(update: AddPartitionSpecUpdate, base_metadata:
TableMetadata, context: _Ta
update={
"partition_specs": base_metadata.partition_specs + [update.spec],
"last_partition_id": max(
- max(field.field_id for field in update.spec.fields),
+ max([field.field_id for field in update.spec.fields] + [0]),
Review Comment:
Avoids a list concatenation :)
```suggestion
max(0, *[field.field_id for field in update.spec.fields]),
```
##########
pyiceberg/catalog/glue.py:
##########
@@ -435,46 +439,69 @@ def _commit_table(self, table_request:
CommitTableRequest) -> CommitTableRespons
)
database_name, table_name =
self.identifier_to_database_and_table(identifier_tuple)
- current_glue_table = self._get_glue_table(database_name=database_name,
table_name=table_name)
- glue_table_version_id = current_glue_table.get("VersionId")
- if not glue_table_version_id:
- raise CommitFailedException(f"Cannot commit
{database_name}.{table_name} because Glue table version id is missing")
- current_table =
self._convert_glue_to_iceberg(glue_table=current_glue_table)
- base_metadata = current_table.metadata
-
- # Validate the update requirements
- for requirement in table_request.requirements:
- requirement.validate(base_metadata)
-
- updated_metadata = update_table_metadata(base_metadata,
table_request.updates)
- if updated_metadata == base_metadata:
- # no changes, do nothing
- return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
-
- # write new metadata
- new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
- new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
- self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
-
- update_table_input = _construct_table_input(
- table_name=table_name,
- metadata_location=new_metadata_location,
- properties=current_table.properties,
- metadata=updated_metadata,
- glue_table=current_glue_table,
- prev_metadata_location=current_table.metadata_location,
- )
+ try:
+ current_glue_table =
self._get_glue_table(database_name=database_name, table_name=table_name)
+ # Update the table
+ glue_table_version_id = current_glue_table.get("VersionId")
+ if not glue_table_version_id:
+ raise CommitFailedException(
+ f"Cannot commit {database_name}.{table_name} because Glue
table version id is missing"
+ )
+ current_table =
self._convert_glue_to_iceberg(glue_table=current_glue_table)
+ base_metadata = current_table.metadata
+
+ # Validate the update requirements
+ for requirement in table_request.requirements:
+ requirement.validate(base_metadata)
+
+ updated_metadata = update_table_metadata(base_metadata,
table_request.updates)
+ if updated_metadata == base_metadata:
+ # no changes, do nothing
+ return CommitTableResponse(metadata=base_metadata,
metadata_location=current_table.metadata_location)
+
+ # write new metadata
+ new_metadata_version =
self._parse_metadata_version(current_table.metadata_location) + 1
+ new_metadata_location =
self._get_metadata_location(current_table.metadata.location,
new_metadata_version)
+ self._write_metadata(updated_metadata, current_table.io,
new_metadata_location)
+
+ update_table_input = _construct_table_input(
+ table_name=table_name,
+ metadata_location=new_metadata_location,
+ properties=current_table.properties,
+ metadata=updated_metadata,
+ glue_table=current_glue_table,
+ prev_metadata_location=current_table.metadata_location,
+ )
- # Pass `version_id` to implement optimistic locking: it ensures
updates are rejected if concurrent
- # modifications occur. See more details at
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
- self._update_glue_table(
- database_name=database_name,
- table_name=table_name,
- table_input=update_table_input,
- version_id=glue_table_version_id,
- )
+ # Pass `version_id` to implement optimistic locking: it ensures
updates are rejected if concurrent
+ # modifications occur. See more details at
https://iceberg.apache.org/docs/latest/aws/#optimistic-locking
+ self._update_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input=update_table_input,
+ version_id=glue_table_version_id,
+ )
+
+ return CommitTableResponse(metadata=updated_metadata,
metadata_location=new_metadata_location)
+ except NoSuchTableError:
+ # Create the table
+ updated_metadata = construct_table_metadata(table_request.updates)
Review Comment:
I would expect `create_table` to be called from the `_commit` from the
`Transaction`.
##########
pyiceberg/catalog/__init__.py:
##########
@@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str):
def _load_file_io(self, properties: Properties = EMPTY_DICT, location:
Optional[str] = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)
+ def _create_staged_table(
+ self,
+ identifier: Union[str, Identifier],
+ schema: Union[Schema, "pa.Schema"],
+ location: Optional[str] = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> StagedTable:
+ """Create a table and return the table instance without committing the
changes.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): Table's schema.
+ location (str | None): Location for the table. Optional Argument.
+ partition_spec (PartitionSpec): PartitionSpec for the table.
+ sort_order (SortOrder): SortOrder for the table.
+ properties (Properties): Table properties that can be a string
based dictionary.
+
+ Returns:
+ Table: the created table instance.
+
+ Raises:
+ TableAlreadyExistsError: If a table with the name already exists.
+ """
+ schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
+
+ database_name, table_name =
self.identifier_to_database_and_table(identifier)
+
+ location = self._resolve_table_location(location, database_name,
table_name)
+ metadata_location = self._get_metadata_location(location=location)
+ metadata = new_table_metadata(
+ location=location, schema=schema, partition_spec=partition_spec,
sort_order=sort_order, properties=properties
+ )
+ io = load_file_io(properties=self.properties,
location=metadata_location)
+ return StagedTable(
+ identifier=(self.name, database_name, table_name),
+ metadata=metadata,
+ metadata_location=metadata_location,
+ io=io,
+ catalog=self,
+ )
+
+ def create_table_transaction(
Review Comment:
WDYT?
```suggestion
def create_table_staged(
```
##########
pyiceberg/table/metadata.py:
##########
@@ -287,6 +287,31 @@ def _generate_snapshot_id() -> int:
return snapshot_id
+class IncompleteTableMetadata(TableMetadataCommonFields):
Review Comment:
Do you know if we need this class? I don't like that we must update the
return signature when updating the table metadata to `TableMetadata |
IncompleteTableMetadata`. While the name looks very similar, the classes have
no OOP relationship. When creating a table we need an UUID (which we can
generate), location, and an initial schema. Looking at the tests, we have all
of those.
##########
pyiceberg/catalog/__init__.py:
##########
@@ -288,6 +291,78 @@ def __init__(self, name: str, **properties: str):
def _load_file_io(self, properties: Properties = EMPTY_DICT, location:
Optional[str] = None) -> FileIO:
return load_file_io({**self.properties, **properties}, location)
+ def _create_staged_table(
+ self,
+ identifier: Union[str, Identifier],
+ schema: Union[Schema, "pa.Schema"],
+ location: Optional[str] = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> StagedTable:
+ """Create a table and return the table instance without committing the
changes.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): Table's schema.
+ location (str | None): Location for the table. Optional Argument.
+ partition_spec (PartitionSpec): PartitionSpec for the table.
+ sort_order (SortOrder): SortOrder for the table.
+ properties (Properties): Table properties that can be a string
based dictionary.
+
+ Returns:
+ Table: the created table instance.
+
+ Raises:
+ TableAlreadyExistsError: If a table with the name already exists.
+ """
+ schema: Schema = self._convert_schema_if_needed(schema) # type: ignore
+
+ database_name, table_name =
self.identifier_to_database_and_table(identifier)
+
+ location = self._resolve_table_location(location, database_name,
table_name)
+ metadata_location = self._get_metadata_location(location=location)
+ metadata = new_table_metadata(
+ location=location, schema=schema, partition_spec=partition_spec,
sort_order=sort_order, properties=properties
+ )
+ io = load_file_io(properties=self.properties,
location=metadata_location)
+ return StagedTable(
+ identifier=(self.name, database_name, table_name),
+ metadata=metadata,
+ metadata_location=metadata_location,
+ io=io,
+ catalog=self,
+ )
+
+ def create_table_transaction(
+ self,
+ identifier: Union[str, Identifier],
+ schema: Union[Schema, "pa.Schema"],
+ location: Optional[str] = None,
+ partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+ sort_order: SortOrder = UNSORTED_SORT_ORDER,
+ properties: Properties = EMPTY_DICT,
+ ) -> CreateTableTransaction:
+ """Create a CreateTableTransaction.
+
+ Args:
+ identifier (str | Identifier): Table identifier.
+ schema (Schema): Table's schema.
+ location (str | None): Location for the table. Optional Argument.
+ partition_spec (PartitionSpec): PartitionSpec for the table.
+ sort_order (SortOrder): SortOrder for the table.
+ properties (Properties): Table properties that can be a string
based dictionary.
+
+ Returns:
+ CreateTableTransaction: createTableTransaction instance.
+
+ Raises:
+ TableAlreadyExistsError: If a table with the name already exists.
+ """
+ return CreateTableTransaction(
+ self._create_staged_table(identifier, schema, location,
partition_spec, sort_order, properties)
Review Comment:
Style nit: I would just inline this function, avoids scrolling :)
##########
tests/integration/test_writes.py:
##########
@@ -632,6 +632,60 @@ def test_write_and_evolve(session_catalog: Catalog,
format_version: int) -> None
snapshot_update.append_data_file(data_file)
[email protected]
[email protected]("format_version", [2])
+def test_create_table_transaction(session_catalog: Catalog, format_version:
int) -> None:
+ if format_version == 1:
+ pytest.skip(
+ "There is a bug in the REST catalog (maybe server side) that
prevents create and commit a staged version 1 table"
Review Comment:
Interesting, let me track this down.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]