geruh commented on code in PR #3058:
URL: https://github.com/apache/iceberg-python/pull/3058#discussion_r2818713084
##########
pyiceberg/catalog/glue.py:
##########
@@ -417,6 +420,110 @@ def _get_glue_table(self, database_name: str, table_name:
str) -> "TableTypeDef"
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ def _is_s3tables_database(self, database_name: str) -> bool:
+ """Check if a Glue database is federated with S3 Tables.
+
+ S3 Tables databases have a FederatedDatabase property with
+ ConnectionType set to aws:s3tables.
+
+ Args:
+ database_name: The name of the Glue database.
+
+ Returns:
+ True if the database is an S3 Tables federated database.
+ """
+ try:
+ database_response = self.glue.get_database(Name=database_name)
+ except self.glue.exceptions.EntityNotFoundException:
+ return False
+ database = database_response["Database"]
+ federated = database.get("FederatedDatabase", {})
+ return (federated.get("ConnectionType") or "").lower() ==
"aws:s3tables"
+
+ def _create_table_s3tables(
+ self,
+ identifier: str | Identifier,
Review Comment:
nit: we have some redundancy with these params passed in: `ident`,
`db_name`, `tbl_name` maybe we can derive?
##########
pyiceberg/catalog/glue.py:
##########
@@ -417,6 +420,110 @@ def _get_glue_table(self, database_name: str, table_name:
str) -> "TableTypeDef"
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ def _is_s3tables_database(self, database_name: str) -> bool:
+ """Check if a Glue database is federated with S3 Tables.
+
+ S3 Tables databases have a FederatedDatabase property with
+ ConnectionType set to aws:s3tables.
+
+ Args:
+ database_name: The name of the Glue database.
+
+ Returns:
+ True if the database is an S3 Tables federated database.
+ """
+ try:
+ database_response = self.glue.get_database(Name=database_name)
+ except self.glue.exceptions.EntityNotFoundException:
+ return False
+ database = database_response["Database"]
+ federated = database.get("FederatedDatabase", {})
+ return (federated.get("ConnectionType") or "").lower() ==
"aws:s3tables"
+
+ def _create_table_s3tables(
+ self,
+ identifier: str | Identifier,
+ database_name: str,
+ table_name: str,
+ schema: Union[Schema, "pa.Schema"],
+ location: str | None,
+ partition_spec: PartitionSpec,
+ sort_order: SortOrder,
+ properties: Properties,
+ ) -> Table:
+ """Create an Iceberg table in an S3 Tables federated database.
+
+ S3 Tables manages storage internally, so the table location is not
known until the
+ table is created in the service. This method:
+ 1. Creates a minimal table entry in Glue (format=ICEBERG), which
causes S3 Tables
+ to allocate storage.
+ 2. Retrieves the managed storage location via GetTable.
+ 3. Writes Iceberg metadata to that location.
+ 4. Updates the Glue table entry with the metadata pointer.
+
+ On failure, the table created in step 1 is deleted.
+ """
+ if location is not None:
+ raise ValueError(
+ f"Cannot specify a location for S3 Tables table
{database_name}.{table_name}. "
+ "S3 Tables manages the storage location automatically."
+ )
+
+ # Create a minimal table in Glue so S3 Tables allocates storage.
+ self._create_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input={
+ "Name": table_name,
+ "Parameters": {"format": "ICEBERG"},
+ },
+ )
+
+ try:
+ # Retrieve the managed storage location.
+ glue_table = self._get_glue_table(database_name=database_name,
table_name=table_name)
+ storage_descriptor = glue_table.get("StorageDescriptor", {})
+ managed_location = storage_descriptor.get("Location")
+ if not managed_location:
+ raise ValueError(f"S3 Tables did not assign a storage location
for {database_name}.{table_name}")
+
+ # Build the Iceberg metadata targeting the managed location.
+ staged_table = self._create_staged_table(
+ identifier=identifier,
+ schema=schema,
+ location=managed_location,
+ partition_spec=partition_spec,
+ sort_order=sort_order,
+ properties=properties,
+ )
+
+ # Write metadata and update the Glue table with the metadata
pointer.
+ self._write_metadata(staged_table.metadata, staged_table.io,
staged_table.metadata_location)
+ table_input = _construct_table_input(table_name,
staged_table.metadata_location, properties, staged_table.metadata)
+ version_id = glue_table.get("VersionId")
+ if not version_id:
+ raise CommitFailedException(
+ f"Cannot commit {database_name}.{table_name} because Glue
table version id is missing"
+ )
+ self._update_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input=table_input,
+ version_id=version_id,
+ )
+ except Exception:
+ # Clean up the table created in step 1.
+ try:
+ self.glue.delete_table(DatabaseName=database_name,
Name=table_name)
+ except Exception:
+ logger.warning(
+ f"Failed to clean up S3 Tables table
{database_name}.{table_name}",
+ exc_info=logger.isEnabledFor(logging.DEBUG),
+ )
+ raise
+
+ return self.load_table(identifier=identifier)
Review Comment:
we could probably avoid the last load table here with the update table
response
##########
pyiceberg/catalog/glue.py:
##########
@@ -417,6 +420,110 @@ def _get_glue_table(self, database_name: str, table_name:
str) -> "TableTypeDef"
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ def _is_s3tables_database(self, database_name: str) -> bool:
+ """Check if a Glue database is federated with S3 Tables.
+
+ S3 Tables databases have a FederatedDatabase property with
+ ConnectionType set to aws:s3tables.
+
+ Args:
+ database_name: The name of the Glue database.
+
+ Returns:
+ True if the database is an S3 Tables federated database.
+ """
+ try:
+ database_response = self.glue.get_database(Name=database_name)
+ except self.glue.exceptions.EntityNotFoundException:
+ return False
+ database = database_response["Database"]
+ federated = database.get("FederatedDatabase", {})
+ return (federated.get("ConnectionType") or "").lower() ==
"aws:s3tables"
Review Comment:
nit: can we extract this
##########
pyiceberg/catalog/glue.py:
##########
@@ -417,6 +420,110 @@ def _get_glue_table(self, database_name: str, table_name:
str) -> "TableTypeDef"
except self.glue.exceptions.EntityNotFoundException as e:
raise NoSuchTableError(f"Table does not exist:
{database_name}.{table_name}") from e
+ def _is_s3tables_database(self, database_name: str) -> bool:
+ """Check if a Glue database is federated with S3 Tables.
+
+ S3 Tables databases have a FederatedDatabase property with
+ ConnectionType set to aws:s3tables.
+
+ Args:
+ database_name: The name of the Glue database.
+
+ Returns:
+ True if the database is an S3 Tables federated database.
+ """
+ try:
+ database_response = self.glue.get_database(Name=database_name)
+ except self.glue.exceptions.EntityNotFoundException:
+ return False
+ database = database_response["Database"]
+ federated = database.get("FederatedDatabase", {})
+ return (federated.get("ConnectionType") or "").lower() ==
"aws:s3tables"
+
+ def _create_table_s3tables(
+ self,
+ identifier: str | Identifier,
+ database_name: str,
+ table_name: str,
+ schema: Union[Schema, "pa.Schema"],
+ location: str | None,
+ partition_spec: PartitionSpec,
+ sort_order: SortOrder,
+ properties: Properties,
+ ) -> Table:
+ """Create an Iceberg table in an S3 Tables federated database.
+
+ S3 Tables manages storage internally, so the table location is not
known until the
+ table is created in the service. This method:
+ 1. Creates a minimal table entry in Glue (format=ICEBERG), which
causes S3 Tables
+ to allocate storage.
+ 2. Retrieves the managed storage location via GetTable.
+ 3. Writes Iceberg metadata to that location.
+ 4. Updates the Glue table entry with the metadata pointer.
+
+ On failure, the table created in step 1 is deleted.
+ """
+ if location is not None:
+ raise ValueError(
+ f"Cannot specify a location for S3 Tables table
{database_name}.{table_name}. "
+ "S3 Tables manages the storage location automatically."
+ )
+
+ # Create a minimal table in Glue so S3 Tables allocates storage.
+ self._create_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input={
+ "Name": table_name,
+ "Parameters": {"format": "ICEBERG"},
+ },
+ )
+
+ try:
+ # Retrieve the managed storage location.
+ glue_table = self._get_glue_table(database_name=database_name,
table_name=table_name)
+ storage_descriptor = glue_table.get("StorageDescriptor", {})
+ managed_location = storage_descriptor.get("Location")
+ if not managed_location:
+ raise ValueError(f"S3 Tables did not assign a storage location
for {database_name}.{table_name}")
+
+ # Build the Iceberg metadata targeting the managed location.
+ staged_table = self._create_staged_table(
+ identifier=identifier,
+ schema=schema,
+ location=managed_location,
+ partition_spec=partition_spec,
+ sort_order=sort_order,
+ properties=properties,
+ )
+
+ # Write metadata and update the Glue table with the metadata
pointer.
+ self._write_metadata(staged_table.metadata, staged_table.io,
staged_table.metadata_location)
+ table_input = _construct_table_input(table_name,
staged_table.metadata_location, properties, staged_table.metadata)
+ version_id = glue_table.get("VersionId")
+ if not version_id:
+ raise CommitFailedException(
+ f"Cannot commit {database_name}.{table_name} because Glue
table version id is missing"
+ )
+ self._update_glue_table(
+ database_name=database_name,
+ table_name=table_name,
+ table_input=table_input,
+ version_id=version_id,
+ )
+ except Exception:
+ # Clean up the table created in step 1.
+ try:
+ self.glue.delete_table(DatabaseName=database_name,
Name=table_name)
Review Comment:
nice!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]