kevinjqliu commented on code in PR #1429: URL: https://github.com/apache/iceberg-python/pull/1429#discussion_r1902235105
########## pyiceberg/catalog/s3tables.py: ########## @@ -0,0 +1,318 @@ +import re +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union + +import boto3 + +from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + CommitFailedException, + InvalidNamespaceName, + InvalidTableName, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + S3TablesError, + TableAlreadyExistsError, + TableBucketNotFound, +) +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.properties import get_first_property_value + +if TYPE_CHECKING: + import pyarrow as pa + +S3TABLES_PROFILE_NAME = "s3tables.profile-name" +S3TABLES_REGION = "s3tables.region" +S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id" +S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key" +S3TABLES_SESSION_TOKEN = "s3tables.session-token" + +S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn" + +S3TABLES_ENDPOINT = "s3tables.endpoint" + +# for naming rules see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html +S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}") +S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata" + + +class S3TableCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] + Review Comment: nit: since PyArrowFileIO doesnt work right now, lets throw an error when its used ########## pyiceberg/catalog/s3tables.py: ########## @@ -0,0 +1,318 @@ +import re +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union + +import boto3 + +from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + CommitFailedException, + InvalidNamespaceName, + InvalidTableName, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + S3TablesError, + TableAlreadyExistsError, + TableBucketNotFound, +) +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.properties import get_first_property_value + +if TYPE_CHECKING: + import pyarrow as pa + +S3TABLES_PROFILE_NAME = "s3tables.profile-name" +S3TABLES_REGION = "s3tables.region" +S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id" +S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key" +S3TABLES_SESSION_TOKEN = "s3tables.session-token" + +S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn" + +S3TABLES_ENDPOINT = "s3tables.endpoint" + +# for naming rules see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html +S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}") +S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata" + + +class S3TableCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] + + session = boto3.Session( + profile_name=properties.get(S3TABLES_PROFILE_NAME), + region_name=get_first_property_value(properties, S3TABLES_REGION, AWS_REGION), + botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION), + aws_access_key_id=get_first_property_value(properties, S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + aws_secret_access_key=get_first_property_value(properties, S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + aws_session_token=get_first_property_value(properties, S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN), + ) + try: + self.s3tables = session.client("s3tables", endpoint_url=properties.get(S3TABLES_ENDPOINT)) + except boto3.session.UnknownServiceError as e: + raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current version: {boto3.__version__}.") from e + + try: + self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn) + except self.s3tables.exceptions.NotFoundException as e: + raise TableBucketNotFound(e) from e + + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + table_identifier = table.name() + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + + current_table, version_token = self._load_table_and_version(identifier=table_identifier) Review Comment: this is overriding the provided `table` with `current_table`, is this the right behavior? ########## pyiceberg/catalog/s3tables.py: ########## Review Comment: we'd want to add this to the docs https://py.iceberg.apache.org/configuration/#catalogs ########## tests/catalog/test_s3tables.py: ########## Review Comment: since this requires `os.environ["ARN"]` to be set, not sure we can check this in to the repo right now ########## tests/catalog/test_s3tables.py: ########## @@ -0,0 +1,180 @@ +import uuid + +import boto3 +import pytest + +from pyiceberg.catalog.s3tables import S3TableCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def table_bucket_arn() -> str: + import os + + # since the moto library does not support s3tables as of 2024-12-14 we have to test against a real AWS endpoint + # in one of the supported regions. + + return os.environ["ARN"] + + +@pytest.fixture +def catalog(table_bucket_arn: str) -> S3TableCatalog: + # pyarrow does not support writing to S3 Table buckets as of 2024-12-14 https://github.com/apache/iceberg-python/issues/1404#issuecomment-2543174146 + properties = {"s3tables.table-bucket-arn": table_bucket_arn, "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + return S3TableCatalog(name="test_s3tables_catalog", **properties) + + +def test_s3tables_api_raises_on_conflicting_version_tokens(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + response = client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + version_token = response["versionToken"] + scrambled_version_token = version_token[::-1] + + warehouse_location = client.get_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name)[ + "warehouseLocation" + ] + metadata_location = f"{warehouse_location}/metadata/00001-{uuid.uuid4()}.metadata.json" + + with pytest.raises(client.exceptions.ConflictException): + client.update_table_metadata_location( + tableBucketARN=table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=scrambled_version_token, + metadataLocation=metadata_location, + ) + + +def test_s3tables_api_raises_on_preexisting_table(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + with pytest.raises(client.exceptions.ConflictException): + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + + +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str) -> None: + properties = {"s3tables.table-bucket-arn": f"{table_bucket_arn}-modified"} + with pytest.raises(TableBucketNotFound): + S3TableCatalog(name="test_s3tables_catalog", **properties) + + +def test_create_namespace(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + namespaces = catalog.list_namespaces() + assert (database_name,) in namespaces + + +def test_load_namespace_properties(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] + + +def test_drop_namespace(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert (database_name,) in catalog.list_namespaces() + catalog.drop_namespace(namespace=database_name) + assert (database_name,) not in catalog.list_namespaces() + + +def test_create_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + assert table == catalog.load_table(identifier) + + +def test_create_table_in_invalid_namespace_raises_exception( + catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + +def test_table_exists(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.table_exists(identifier=identifier) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.table_exists(identifier=identifier) + + +def test_rename_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + to_database_name = f"{database_name}new" + to_table_name = f"{table_name}new" + to_identifier = (to_database_name, to_table_name) + catalog.create_namespace(namespace=to_database_name) + catalog.rename_table(from_identifier=identifier, to_identifier=to_identifier) + + assert not catalog.table_exists(identifier=identifier) + assert catalog.table_exists(identifier=to_identifier) + + +def test_list_tables(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.list_tables(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.list_tables(namespace=database_name) + + +def test_drop_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + catalog.drop_table(identifier=identifier) + + with pytest.raises(NoSuchTableError): + catalog.load_table(identifier=identifier) + + +def test_commit_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms Review Comment: nit: also check the schema for the new column ########## pyiceberg/catalog/s3tables.py: ########## @@ -0,0 +1,318 @@ +import re +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union + +import boto3 + +from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + CommitFailedException, + InvalidNamespaceName, + InvalidTableName, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + S3TablesError, + TableAlreadyExistsError, + TableBucketNotFound, +) +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.properties import get_first_property_value + +if TYPE_CHECKING: + import pyarrow as pa + +S3TABLES_PROFILE_NAME = "s3tables.profile-name" +S3TABLES_REGION = "s3tables.region" +S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id" +S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key" +S3TABLES_SESSION_TOKEN = "s3tables.session-token" + +S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn" + +S3TABLES_ENDPOINT = "s3tables.endpoint" + +# for naming rules see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html +S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}") +S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata" + + +class S3TableCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] + + session = boto3.Session( + profile_name=properties.get(S3TABLES_PROFILE_NAME), + region_name=get_first_property_value(properties, S3TABLES_REGION, AWS_REGION), + botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION), + aws_access_key_id=get_first_property_value(properties, S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + aws_secret_access_key=get_first_property_value(properties, S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + aws_session_token=get_first_property_value(properties, S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN), + ) + try: + self.s3tables = session.client("s3tables", endpoint_url=properties.get(S3TABLES_ENDPOINT)) + except boto3.session.UnknownServiceError as e: + raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current version: {boto3.__version__}.") from e + + try: + self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn) + except self.s3tables.exceptions.NotFoundException as e: + raise TableBucketNotFound(e) from e + + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + table_identifier = table.name() + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + + current_table, version_token = self._load_table_and_version(identifier=table_identifier) + + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if current_table and updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + overwrite=True, + ) + + # try to update metadata location which will fail if the versionToken changed meanwhile + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=version_token, + metadataLocation=updated_staged_table.metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + valid_namespace: str = self._validate_namespace_identifier(namespace) + self.s3tables.create_namespace(tableBucketARN=self.table_bucket_arn, namespace=[valid_namespace]) Review Comment: does `create_namespace` support `properties`? ########## tests/catalog/test_s3tables.py: ########## @@ -0,0 +1,180 @@ +import uuid + +import boto3 +import pytest + +from pyiceberg.catalog.s3tables import S3TableCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def table_bucket_arn() -> str: + import os + + # since the moto library does not support s3tables as of 2024-12-14 we have to test against a real AWS endpoint + # in one of the supported regions. + + return os.environ["ARN"] + + +@pytest.fixture +def catalog(table_bucket_arn: str) -> S3TableCatalog: + # pyarrow does not support writing to S3 Table buckets as of 2024-12-14 https://github.com/apache/iceberg-python/issues/1404#issuecomment-2543174146 + properties = {"s3tables.table-bucket-arn": table_bucket_arn, "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + return S3TableCatalog(name="test_s3tables_catalog", **properties) + + +def test_s3tables_api_raises_on_conflicting_version_tokens(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + response = client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + version_token = response["versionToken"] + scrambled_version_token = version_token[::-1] + + warehouse_location = client.get_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name)[ + "warehouseLocation" + ] + metadata_location = f"{warehouse_location}/metadata/00001-{uuid.uuid4()}.metadata.json" + + with pytest.raises(client.exceptions.ConflictException): + client.update_table_metadata_location( + tableBucketARN=table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=scrambled_version_token, + metadataLocation=metadata_location, + ) + + +def test_s3tables_api_raises_on_preexisting_table(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + with pytest.raises(client.exceptions.ConflictException): + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") Review Comment: nit, these are testing boto3 client, do we need these tests here? ########## tests/catalog/test_s3tables.py: ########## @@ -0,0 +1,180 @@ +import uuid + +import boto3 +import pytest + +from pyiceberg.catalog.s3tables import S3TableCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def table_bucket_arn() -> str: + import os + + # since the moto library does not support s3tables as of 2024-12-14 we have to test against a real AWS endpoint + # in one of the supported regions. + + return os.environ["ARN"] + + +@pytest.fixture +def catalog(table_bucket_arn: str) -> S3TableCatalog: + # pyarrow does not support writing to S3 Table buckets as of 2024-12-14 https://github.com/apache/iceberg-python/issues/1404#issuecomment-2543174146 + properties = {"s3tables.table-bucket-arn": table_bucket_arn, "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + return S3TableCatalog(name="test_s3tables_catalog", **properties) + + +def test_s3tables_api_raises_on_conflicting_version_tokens(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + response = client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + version_token = response["versionToken"] + scrambled_version_token = version_token[::-1] + + warehouse_location = client.get_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name)[ + "warehouseLocation" + ] + metadata_location = f"{warehouse_location}/metadata/00001-{uuid.uuid4()}.metadata.json" + + with pytest.raises(client.exceptions.ConflictException): + client.update_table_metadata_location( + tableBucketARN=table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=scrambled_version_token, + metadataLocation=metadata_location, + ) + + +def test_s3tables_api_raises_on_preexisting_table(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + with pytest.raises(client.exceptions.ConflictException): + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + + +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str) -> None: + properties = {"s3tables.table-bucket-arn": f"{table_bucket_arn}-modified"} + with pytest.raises(TableBucketNotFound): + S3TableCatalog(name="test_s3tables_catalog", **properties) + + +def test_create_namespace(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + namespaces = catalog.list_namespaces() + assert (database_name,) in namespaces + + +def test_load_namespace_properties(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] Review Comment: nit: is this a s3 tables specific behavior? i dont think the `namespace` property is set automatically in other catalogs ########## tests/catalog/test_s3tables.py: ########## @@ -0,0 +1,180 @@ +import uuid + +import boto3 +import pytest + +from pyiceberg.catalog.s3tables import S3TableCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def table_bucket_arn() -> str: + import os + + # since the moto library does not support s3tables as of 2024-12-14 we have to test against a real AWS endpoint + # in one of the supported regions. + + return os.environ["ARN"] + + +@pytest.fixture +def catalog(table_bucket_arn: str) -> S3TableCatalog: + # pyarrow does not support writing to S3 Table buckets as of 2024-12-14 https://github.com/apache/iceberg-python/issues/1404#issuecomment-2543174146 + properties = {"s3tables.table-bucket-arn": table_bucket_arn, "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + return S3TableCatalog(name="test_s3tables_catalog", **properties) + + Review Comment: should we add a test for read/write? ########## pyiceberg/catalog/s3tables.py: ########## @@ -0,0 +1,318 @@ +import re +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union + +import boto3 + +from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + CommitFailedException, + InvalidNamespaceName, + InvalidTableName, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + S3TablesError, + TableAlreadyExistsError, + TableBucketNotFound, +) +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.properties import get_first_property_value + +if TYPE_CHECKING: + import pyarrow as pa + +S3TABLES_PROFILE_NAME = "s3tables.profile-name" +S3TABLES_REGION = "s3tables.region" +S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id" +S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key" +S3TABLES_SESSION_TOKEN = "s3tables.session-token" + +S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn" + +S3TABLES_ENDPOINT = "s3tables.endpoint" + +# for naming rules see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html +S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}") +S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata" + + +class S3TableCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] + + session = boto3.Session( + profile_name=properties.get(S3TABLES_PROFILE_NAME), + region_name=get_first_property_value(properties, S3TABLES_REGION, AWS_REGION), + botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION), + aws_access_key_id=get_first_property_value(properties, S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + aws_secret_access_key=get_first_property_value(properties, S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + aws_session_token=get_first_property_value(properties, S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN), + ) + try: + self.s3tables = session.client("s3tables", endpoint_url=properties.get(S3TABLES_ENDPOINT)) + except boto3.session.UnknownServiceError as e: + raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current version: {boto3.__version__}.") from e + + try: + self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn) + except self.s3tables.exceptions.NotFoundException as e: + raise TableBucketNotFound(e) from e + + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + table_identifier = table.name() + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + + current_table, version_token = self._load_table_and_version(identifier=table_identifier) + + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if current_table and updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + overwrite=True, + ) + + # try to update metadata location which will fail if the versionToken changed meanwhile + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=version_token, + metadataLocation=updated_staged_table.metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + valid_namespace: str = self._validate_namespace_identifier(namespace) + self.s3tables.create_namespace(tableBucketARN=self.table_bucket_arn, namespace=[valid_namespace]) + + def _validate_namespace_identifier(self, namespace: Union[str, Identifier]) -> str: + namespace = self.identifier_to_database(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(namespace) or namespace == S3TABLES_RESERVED_NAMESPACE: + raise InvalidNamespaceName("The specified namespace name is not valid.") + + return namespace + + def _validate_database_and_table_identifier(self, identifier: Union[str, Identifier]) -> Tuple[str, str]: + namespace, table_name = self.identifier_to_database_and_table(identifier) + + namespace = self._validate_namespace_identifier(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(table_name): + raise InvalidTableName("The specified table name is not valid.") + + return namespace, table_name + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + namespace, table_name = self._validate_database_and_table_identifier(identifier) Review Comment: can you document the process here? `s3tables.create_table ` creates an empty table and then we write the new metadata json ########## tests/catalog/test_s3tables.py: ########## @@ -0,0 +1,180 @@ +import uuid + +import boto3 +import pytest + +from pyiceberg.catalog.s3tables import S3TableCatalog +from pyiceberg.exceptions import NoSuchNamespaceError, NoSuchTableError, TableBucketNotFound +from pyiceberg.schema import Schema +from pyiceberg.types import IntegerType + + +@pytest.fixture +def database_name(database_name: str) -> str: + # naming rules prevent "-" in namespaces for s3 table buckets + return database_name.replace("-", "_") + + +@pytest.fixture +def table_name(table_name: str) -> str: + # naming rules prevent "-" in table namees for s3 table buckets + return table_name.replace("-", "_") + + +@pytest.fixture +def table_bucket_arn() -> str: + import os + + # since the moto library does not support s3tables as of 2024-12-14 we have to test against a real AWS endpoint + # in one of the supported regions. + + return os.environ["ARN"] + + +@pytest.fixture +def catalog(table_bucket_arn: str) -> S3TableCatalog: + # pyarrow does not support writing to S3 Table buckets as of 2024-12-14 https://github.com/apache/iceberg-python/issues/1404#issuecomment-2543174146 + properties = {"s3tables.table-bucket-arn": table_bucket_arn, "py-io-impl": "pyiceberg.io.fsspec.FsspecFileIO"} + return S3TableCatalog(name="test_s3tables_catalog", **properties) + + +def test_s3tables_api_raises_on_conflicting_version_tokens(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + response = client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + version_token = response["versionToken"] + scrambled_version_token = version_token[::-1] + + warehouse_location = client.get_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name)[ + "warehouseLocation" + ] + metadata_location = f"{warehouse_location}/metadata/00001-{uuid.uuid4()}.metadata.json" + + with pytest.raises(client.exceptions.ConflictException): + client.update_table_metadata_location( + tableBucketARN=table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=scrambled_version_token, + metadataLocation=metadata_location, + ) + + +def test_s3tables_api_raises_on_preexisting_table(table_bucket_arn: str, database_name: str, table_name: str) -> None: + client = boto3.client("s3tables") + client.create_namespace(tableBucketARN=table_bucket_arn, namespace=[database_name]) + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + with pytest.raises(client.exceptions.ConflictException): + client.create_table(tableBucketARN=table_bucket_arn, namespace=database_name, name=table_name, format="ICEBERG") + + +def test_creating_catalog_validates_s3_table_bucket_exists(table_bucket_arn: str) -> None: + properties = {"s3tables.table-bucket-arn": f"{table_bucket_arn}-modified"} + with pytest.raises(TableBucketNotFound): + S3TableCatalog(name="test_s3tables_catalog", **properties) + + +def test_create_namespace(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + namespaces = catalog.list_namespaces() + assert (database_name,) in namespaces + + +def test_load_namespace_properties(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert database_name in catalog.load_namespace_properties(database_name)["namespace"] + + +def test_drop_namespace(catalog: S3TableCatalog, database_name: str) -> None: + catalog.create_namespace(namespace=database_name) + assert (database_name,) in catalog.list_namespaces() + catalog.drop_namespace(namespace=database_name) + assert (database_name,) not in catalog.list_namespaces() + + +def test_create_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + assert table == catalog.load_table(identifier) + + +def test_create_table_in_invalid_namespace_raises_exception( + catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema +) -> None: + identifier = (database_name, table_name) + + with pytest.raises(NoSuchNamespaceError): + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + +def test_table_exists(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.table_exists(identifier=identifier) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.table_exists(identifier=identifier) + + +def test_rename_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + to_database_name = f"{database_name}new" + to_table_name = f"{table_name}new" + to_identifier = (to_database_name, to_table_name) + catalog.create_namespace(namespace=to_database_name) + catalog.rename_table(from_identifier=identifier, to_identifier=to_identifier) + + assert not catalog.table_exists(identifier=identifier) + assert catalog.table_exists(identifier=to_identifier) + + +def test_list_tables(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + assert not catalog.list_tables(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + assert catalog.list_tables(namespace=database_name) + + +def test_drop_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + catalog.create_table(identifier=identifier, schema=table_schema_nested) + + catalog.drop_table(identifier=identifier) + + with pytest.raises(NoSuchTableError): + catalog.load_table(identifier=identifier) + + +def test_commit_table(catalog: S3TableCatalog, database_name: str, table_name: str, table_schema_nested: Schema) -> None: + identifier = (database_name, table_name) + + catalog.create_namespace(namespace=database_name) + table = catalog.create_table(identifier=identifier, schema=table_schema_nested) + + last_updated_ms = table.metadata.last_updated_ms + original_table_metadata_location = table.metadata_location + original_table_last_updated_ms = table.metadata.last_updated_ms + + transaction = table.transaction() + update = transaction.update_schema() + update.add_column(path="b", field_type=IntegerType()) + update.commit() + transaction.commit_transaction() + + updated_table_metadata = table.metadata + assert updated_table_metadata.current_schema_id == 1 + assert len(updated_table_metadata.schemas) == 2 + assert updated_table_metadata.last_updated_ms > last_updated_ms + assert updated_table_metadata.metadata_log[0].metadata_file == original_table_metadata_location + assert updated_table_metadata.metadata_log[0].timestamp_ms == original_table_last_updated_ms Review Comment: also maybe test committing new data as well ########## pyiceberg/catalog/s3tables.py: ########## @@ -0,0 +1,318 @@ +import re +from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union + +import boto3 + +from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, PropertiesUpdateSummary +from pyiceberg.exceptions import ( + CommitFailedException, + InvalidNamespaceName, + InvalidTableName, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + S3TablesError, + TableAlreadyExistsError, + TableBucketNotFound, +) +from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, AWS_SESSION_TOKEN, load_file_io +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.serializers import FromInputFile +from pyiceberg.table import CommitTableResponse, Table +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.table.update import TableRequirement, TableUpdate +from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties +from pyiceberg.utils.properties import get_first_property_value + +if TYPE_CHECKING: + import pyarrow as pa + +S3TABLES_PROFILE_NAME = "s3tables.profile-name" +S3TABLES_REGION = "s3tables.region" +S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id" +S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key" +S3TABLES_SESSION_TOKEN = "s3tables.session-token" + +S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn" + +S3TABLES_ENDPOINT = "s3tables.endpoint" + +# for naming rules see: https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html +S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}") +S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata" + + +class S3TableCatalog(MetastoreCatalog): + def __init__(self, name: str, **properties: str): + super().__init__(name, **properties) + + self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN] + + session = boto3.Session( + profile_name=properties.get(S3TABLES_PROFILE_NAME), + region_name=get_first_property_value(properties, S3TABLES_REGION, AWS_REGION), + botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION), + aws_access_key_id=get_first_property_value(properties, S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID), + aws_secret_access_key=get_first_property_value(properties, S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY), + aws_session_token=get_first_property_value(properties, S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN), + ) + try: + self.s3tables = session.client("s3tables", endpoint_url=properties.get(S3TABLES_ENDPOINT)) + except boto3.session.UnknownServiceError as e: + raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current version: {boto3.__version__}.") from e + + try: + self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn) + except self.s3tables.exceptions.NotFoundException as e: + raise TableBucketNotFound(e) from e + + def commit_table( + self, table: Table, requirements: Tuple[TableRequirement, ...], updates: Tuple[TableUpdate, ...] + ) -> CommitTableResponse: + table_identifier = table.name() + database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + + current_table, version_token = self._load_table_and_version(identifier=table_identifier) + + updated_staged_table = self._update_and_stage_table(current_table, table_identifier, requirements, updates) + if current_table and updated_staged_table.metadata == current_table.metadata: + # no changes, do nothing + return CommitTableResponse(metadata=current_table.metadata, metadata_location=current_table.metadata_location) + + self._write_metadata( + metadata=updated_staged_table.metadata, + io=updated_staged_table.io, + metadata_path=updated_staged_table.metadata_location, + overwrite=True, + ) + + # try to update metadata location which will fail if the versionToken changed meanwhile + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=database_name, + name=table_name, + versionToken=version_token, + metadataLocation=updated_staged_table.metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot commit {database_name}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + return CommitTableResponse( + metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location + ) + + def create_namespace(self, namespace: Union[str, Identifier], properties: Properties = EMPTY_DICT) -> None: + valid_namespace: str = self._validate_namespace_identifier(namespace) + self.s3tables.create_namespace(tableBucketARN=self.table_bucket_arn, namespace=[valid_namespace]) + + def _validate_namespace_identifier(self, namespace: Union[str, Identifier]) -> str: + namespace = self.identifier_to_database(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(namespace) or namespace == S3TABLES_RESERVED_NAMESPACE: + raise InvalidNamespaceName("The specified namespace name is not valid.") + + return namespace + + def _validate_database_and_table_identifier(self, identifier: Union[str, Identifier]) -> Tuple[str, str]: + namespace, table_name = self.identifier_to_database_and_table(identifier) + + namespace = self._validate_namespace_identifier(namespace) + + if not S3TABLES_VALID_NAME_REGEX.fullmatch(table_name): + raise InvalidTableName("The specified table name is not valid.") + + return namespace, table_name + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Union[Schema, "pa.Schema"], + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + ) -> Table: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + + schema: Schema = self._convert_schema_if_needed(schema) # type: ignore + + try: + self.s3tables.create_table( + tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name, format="ICEBERG" + ) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchNamespaceError(f"Cannot create {namespace}.{table_name} because no such namespace exists.") from e + except self.s3tables.exceptions.ConflictException as e: + raise TableAlreadyExistsError( + f"Cannot create {namespace}.{table_name} because a table of the same name already exists in the namespace." + ) from e + + response = self.s3tables.get_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + version_token = response["versionToken"] + + warehouse_location = response["warehouseLocation"] + metadata_location = self._get_metadata_location(location=warehouse_location) + metadata = new_table_metadata( + location=warehouse_location, + schema=schema, + partition_spec=partition_spec, + sort_order=sort_order, + properties=properties, + ) + + io = load_file_io(properties=self.properties, location=metadata_location) + # this triggers unsupported list operation error, setting overwrite=True is a workaround for now + self._write_metadata(metadata, io, metadata_location, overwrite=True) + + try: + self.s3tables.update_table_metadata_location( + tableBucketARN=self.table_bucket_arn, + namespace=namespace, + name=table_name, + versionToken=version_token, + metadataLocation=metadata_location, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot create {namespace}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + + return self.load_table(identifier=identifier) + + def drop_namespace(self, namespace: Union[str, Identifier]) -> None: + namespace = self._validate_namespace_identifier(namespace) + try: + self.s3tables.delete_namespace(tableBucketARN=self.table_bucket_arn, namespace=namespace) + except self.s3tables.exceptions.ConflictException as e: + raise NamespaceNotEmptyError(f"Namespace {namespace} is not empty.") from e + + def drop_table(self, identifier: Union[str, Identifier]) -> None: + namespace, table_name = self._validate_database_and_table_identifier(identifier) + try: + response = self.s3tables.get_table(tableBucketARN=self.table_bucket_arn, namespace=namespace, name=table_name) + except self.s3tables.exceptions.NotFoundException as e: + raise NoSuchTableError(f"No table with identifier {identifier} exists.") from e + + version_token = response["versionToken"] + try: + self.s3tables.delete_table( + tableBucketARN=self.table_bucket_arn, + namespace=namespace, + name=table_name, + versionToken=version_token, + ) + except self.s3tables.exceptions.ConflictException as e: + raise CommitFailedException( + f"Cannot delete {namespace}.{table_name} because of a concurrent update to the table version {version_token}." + ) from e + + def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: + if namespace: + namespace = self._validate_namespace_identifier(namespace) + paginator = self.s3tables.get_paginator("list_namespaces") + + namespaces: List[Identifier] = [] + for page in paginator.paginate(tableBucketARN=self.table_bucket_arn): + namespaces.extend(tuple(entry["namespace"]) for entry in page["namespaces"]) + + return namespaces + + def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: + namespace = self._validate_namespace_identifier(namespace) + paginator = self.s3tables.get_paginator("list_tables") + tables: List[Identifier] = [] + for page in paginator.paginate(tableBucketARN=self.table_bucket_arn, namespace=namespace): + tables.extend((namespace, table["name"]) for table in page["tables"]) + return tables + + def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: + namespace = self._validate_namespace_identifier(namespace) + response = self.s3tables.get_namespace(tableBucketARN=self.table_bucket_arn, namespace=namespace) + return { + "namespace": response["namespace"], + "createdAt": response["createdAt"], + "createdBy": response["createdBy"], + "ownerAccountId": response["ownerAccountId"], + } Review Comment: this deviates from other catalog implementations. is there a reason why we want to set these? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org