HonahX commented on code in PR #1429:
URL: https://github.com/apache/iceberg-python/pull/1429#discussion_r1904584229


##########
pyiceberg/catalog/s3tables.py:
##########
@@ -0,0 +1,324 @@
+import re
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union
+
+import boto3
+
+from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, 
PropertiesUpdateSummary
+from pyiceberg.exceptions import (
+    CommitFailedException,
+    InvalidNamespaceName,
+    InvalidTableName,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    S3TablesError,
+    TableAlreadyExistsError,
+    TableBucketNotFound,
+)
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN, load_file_io
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.table.update import TableRequirement, TableUpdate
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+from pyiceberg.utils.properties import get_first_property_value
+
+if TYPE_CHECKING:
+    import pyarrow as pa
+
+S3TABLES_PROFILE_NAME = "s3tables.profile-name"
+S3TABLES_REGION = "s3tables.region"
+S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id"
+S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key"
+S3TABLES_SESSION_TOKEN = "s3tables.session-token"
+
+S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn"
+
+S3TABLES_ENDPOINT = "s3tables.endpoint"
+
+# for naming rules see: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html
+S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}")
+S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata"
+
+
+class S3TableCatalog(MetastoreCatalog):
+    def __init__(self, name: str, **properties: str):
+        super().__init__(name, **properties)
+
+        self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN]
+
+        session = boto3.Session(
+            profile_name=properties.get(S3TABLES_PROFILE_NAME),
+            region_name=get_first_property_value(properties, S3TABLES_REGION, 
AWS_REGION),
+            botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
+            aws_access_key_id=get_first_property_value(properties, 
S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
+            aws_secret_access_key=get_first_property_value(properties, 
S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
+            aws_session_token=get_first_property_value(properties, 
S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN),
+        )
+        try:
+            self.s3tables = session.client("s3tables", 
endpoint_url=properties.get(S3TABLES_ENDPOINT))
+        except boto3.session.UnknownServiceError as e:
+            raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current 
version: {boto3.__version__}.") from e
+
+        try:
+            
self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn)
+        except self.s3tables.exceptions.NotFoundException as e:
+            raise TableBucketNotFound(e) from e
+
+    def commit_table(
+        self, table: Table, requirements: Tuple[TableRequirement, ...], 
updates: Tuple[TableUpdate, ...]
+    ) -> CommitTableResponse:

Review Comment:
   I did not find the logic for cases when table not exist, which means 
`create_table_transaction` will not be supported in the current version.
   
https://github.com/apache/iceberg-python/blob/e41c428e852db78459890bab2c29aee9d13097b8/pyiceberg/catalog/__init__.py#L754-L765
   
   We do not have to support everything in the initial PR. But it will be good 
to override `create_table_transaction` as "Not Implemented" for the s3tables



##########
pyiceberg/catalog/s3tables.py:
##########
@@ -0,0 +1,324 @@
+import re
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union
+
+import boto3
+
+from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, 
PropertiesUpdateSummary
+from pyiceberg.exceptions import (
+    CommitFailedException,
+    InvalidNamespaceName,
+    InvalidTableName,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    S3TablesError,
+    TableAlreadyExistsError,
+    TableBucketNotFound,
+)
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN, load_file_io
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.table.update import TableRequirement, TableUpdate
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+from pyiceberg.utils.properties import get_first_property_value
+
+if TYPE_CHECKING:
+    import pyarrow as pa
+
+S3TABLES_PROFILE_NAME = "s3tables.profile-name"
+S3TABLES_REGION = "s3tables.region"
+S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id"
+S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key"
+S3TABLES_SESSION_TOKEN = "s3tables.session-token"
+
+S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn"
+
+S3TABLES_ENDPOINT = "s3tables.endpoint"
+
+# for naming rules see: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html
+S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}")

Review Comment:
   ```suggestion
   S3TABLES_VALID_NAME_REGEX = pattern = 
re.compile("[a-z0-9][a-z0-9_]{1,61}[a-z0-9]")
   ```
   I think we also have to enforce names to end with a letter or a number.
   
   > Names must begin and end with a letter or number.
   
   
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html#naming-rules-table
   



##########
tests/catalog/test_s3tables.py:
##########
@@ -0,0 +1,227 @@
+import pytest

Review Comment:
   Shall we rename this to `integration_test_s3tables.py`? We use this naming 
convention for tests involved real endpoints, like `integration_test_glue.py`. 
I think it will be great to keep a version of testing against real endpoints 
even after we have moto s3tables available.



##########
pyiceberg/catalog/s3tables.py:
##########
@@ -0,0 +1,324 @@
+import re
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union
+
+import boto3
+
+from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, 
PropertiesUpdateSummary
+from pyiceberg.exceptions import (
+    CommitFailedException,
+    InvalidNamespaceName,
+    InvalidTableName,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    S3TablesError,
+    TableAlreadyExistsError,
+    TableBucketNotFound,
+)
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN, load_file_io
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.table.update import TableRequirement, TableUpdate
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+from pyiceberg.utils.properties import get_first_property_value
+
+if TYPE_CHECKING:
+    import pyarrow as pa
+
+S3TABLES_PROFILE_NAME = "s3tables.profile-name"
+S3TABLES_REGION = "s3tables.region"
+S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id"
+S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key"
+S3TABLES_SESSION_TOKEN = "s3tables.session-token"
+
+S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn"
+
+S3TABLES_ENDPOINT = "s3tables.endpoint"
+
+# for naming rules see: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html
+S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}")
+S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata"
+
+
+class S3TableCatalog(MetastoreCatalog):
+    def __init__(self, name: str, **properties: str):
+        super().__init__(name, **properties)
+
+        self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN]
+
+        session = boto3.Session(
+            profile_name=properties.get(S3TABLES_PROFILE_NAME),
+            region_name=get_first_property_value(properties, S3TABLES_REGION, 
AWS_REGION),
+            botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
+            aws_access_key_id=get_first_property_value(properties, 
S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
+            aws_secret_access_key=get_first_property_value(properties, 
S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
+            aws_session_token=get_first_property_value(properties, 
S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN),
+        )
+        try:
+            self.s3tables = session.client("s3tables", 
endpoint_url=properties.get(S3TABLES_ENDPOINT))
+        except boto3.session.UnknownServiceError as e:
+            raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current 
version: {boto3.__version__}.") from e
+
+        try:
+            
self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn)
+        except self.s3tables.exceptions.NotFoundException as e:
+            raise TableBucketNotFound(e) from e
+
+    def commit_table(
+        self, table: Table, requirements: Tuple[TableRequirement, ...], 
updates: Tuple[TableUpdate, ...]
+    ) -> CommitTableResponse:
+        table_identifier = table.name()
+        database_name, table_name = 
self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
+
+        current_table, version_token = 
self._load_table_and_version(identifier=table_identifier)
+
+        updated_staged_table = self._update_and_stage_table(current_table, 
table_identifier, requirements, updates)
+        if current_table and updated_staged_table.metadata == 
current_table.metadata:
+            # no changes, do nothing
+            return CommitTableResponse(metadata=current_table.metadata, 
metadata_location=current_table.metadata_location)
+
+        self._write_metadata(
+            metadata=updated_staged_table.metadata,
+            io=updated_staged_table.io,
+            metadata_path=updated_staged_table.metadata_location,
+            overwrite=True,
+        )
+
+        # try to update metadata location which will fail if the versionToken 
changed meanwhile
+        try:
+            self.s3tables.update_table_metadata_location(
+                tableBucketARN=self.table_bucket_arn,
+                namespace=database_name,
+                name=table_name,
+                versionToken=version_token,
+                metadataLocation=updated_staged_table.metadata_location,
+            )
+        except self.s3tables.exceptions.ConflictException as e:
+            raise CommitFailedException(
+                f"Cannot commit {database_name}.{table_name} because of a 
concurrent update to the table version {version_token}."
+            ) from e
+        return CommitTableResponse(
+            metadata=updated_staged_table.metadata, 
metadata_location=updated_staged_table.metadata_location
+        )
+
+    def create_namespace(self, namespace: Union[str, Identifier], properties: 
Properties = EMPTY_DICT) -> None:
+        if properties:
+            raise NotImplementedError("Setting namespace properties is not 
supported.")
+        valid_namespace: str = self._validate_namespace_identifier(namespace)
+        self.s3tables.create_namespace(tableBucketARN=self.table_bucket_arn, 
namespace=[valid_namespace])
+
+    def _validate_namespace_identifier(self, namespace: Union[str, 
Identifier]) -> str:
+        namespace = self.identifier_to_database(namespace)
+
+        if not S3TABLES_VALID_NAME_REGEX.fullmatch(namespace) or namespace == 
S3TABLES_RESERVED_NAMESPACE:
+            raise InvalidNamespaceName("The specified namespace name is not 
valid.")

Review Comment:
   May be we can add more info to this error message, like the link to: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html.
 WDYT?



##########
pyiceberg/catalog/s3tables.py:
##########
@@ -0,0 +1,324 @@
+import re
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union
+
+import boto3
+
+from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, 
PropertiesUpdateSummary
+from pyiceberg.exceptions import (
+    CommitFailedException,
+    InvalidNamespaceName,
+    InvalidTableName,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    S3TablesError,
+    TableAlreadyExistsError,
+    TableBucketNotFound,
+)
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN, load_file_io
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.table.update import TableRequirement, TableUpdate
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+from pyiceberg.utils.properties import get_first_property_value
+
+if TYPE_CHECKING:
+    import pyarrow as pa
+
+S3TABLES_PROFILE_NAME = "s3tables.profile-name"
+S3TABLES_REGION = "s3tables.region"
+S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id"
+S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key"
+S3TABLES_SESSION_TOKEN = "s3tables.session-token"
+
+S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn"
+
+S3TABLES_ENDPOINT = "s3tables.endpoint"
+
+# for naming rules see: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html
+S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}")
+S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata"
+
+
+class S3TableCatalog(MetastoreCatalog):
+    def __init__(self, name: str, **properties: str):
+        super().__init__(name, **properties)
+
+        self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN]
+
+        session = boto3.Session(
+            profile_name=properties.get(S3TABLES_PROFILE_NAME),
+            region_name=get_first_property_value(properties, S3TABLES_REGION, 
AWS_REGION),
+            botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
+            aws_access_key_id=get_first_property_value(properties, 
S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
+            aws_secret_access_key=get_first_property_value(properties, 
S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
+            aws_session_token=get_first_property_value(properties, 
S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN),
+        )
+        try:
+            self.s3tables = session.client("s3tables", 
endpoint_url=properties.get(S3TABLES_ENDPOINT))
+        except boto3.session.UnknownServiceError as e:
+            raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current 
version: {boto3.__version__}.") from e
+
+        try:
+            
self.s3tables.get_table_bucket(tableBucketARN=self.table_bucket_arn)
+        except self.s3tables.exceptions.NotFoundException as e:
+            raise TableBucketNotFound(e) from e
+
+    def commit_table(
+        self, table: Table, requirements: Tuple[TableRequirement, ...], 
updates: Tuple[TableUpdate, ...]
+    ) -> CommitTableResponse:
+        table_identifier = table.name()
+        database_name, table_name = 
self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
+
+        current_table, version_token = 
self._load_table_and_version(identifier=table_identifier)
+
+        updated_staged_table = self._update_and_stage_table(current_table, 
table_identifier, requirements, updates)
+        if current_table and updated_staged_table.metadata == 
current_table.metadata:
+            # no changes, do nothing
+            return CommitTableResponse(metadata=current_table.metadata, 
metadata_location=current_table.metadata_location)
+
+        self._write_metadata(
+            metadata=updated_staged_table.metadata,
+            io=updated_staged_table.io,
+            metadata_path=updated_staged_table.metadata_location,
+            overwrite=True,
+        )
+
+        # try to update metadata location which will fail if the versionToken 
changed meanwhile
+        try:
+            self.s3tables.update_table_metadata_location(
+                tableBucketARN=self.table_bucket_arn,
+                namespace=database_name,
+                name=table_name,
+                versionToken=version_token,
+                metadataLocation=updated_staged_table.metadata_location,
+            )
+        except self.s3tables.exceptions.ConflictException as e:
+            raise CommitFailedException(
+                f"Cannot commit {database_name}.{table_name} because of a 
concurrent update to the table version {version_token}."
+            ) from e
+        return CommitTableResponse(
+            metadata=updated_staged_table.metadata, 
metadata_location=updated_staged_table.metadata_location
+        )
+
+    def create_namespace(self, namespace: Union[str, Identifier], properties: 
Properties = EMPTY_DICT) -> None:
+        if properties:
+            raise NotImplementedError("Setting namespace properties is not 
supported.")
+        valid_namespace: str = self._validate_namespace_identifier(namespace)
+        self.s3tables.create_namespace(tableBucketARN=self.table_bucket_arn, 
namespace=[valid_namespace])
+
+    def _validate_namespace_identifier(self, namespace: Union[str, 
Identifier]) -> str:
+        namespace = self.identifier_to_database(namespace)
+
+        if not S3TABLES_VALID_NAME_REGEX.fullmatch(namespace) or namespace == 
S3TABLES_RESERVED_NAMESPACE:
+            raise InvalidNamespaceName("The specified namespace name is not 
valid.")
+
+        return namespace
+
+    def _validate_database_and_table_identifier(self, identifier: Union[str, 
Identifier]) -> Tuple[str, str]:
+        namespace, table_name = 
self.identifier_to_database_and_table(identifier)
+
+        namespace = self._validate_namespace_identifier(namespace)
+
+        if not S3TABLES_VALID_NAME_REGEX.fullmatch(table_name):
+            raise InvalidTableName("The specified table name is not valid.")
+
+        return namespace, table_name
+
+    def create_table(
+        self,
+        identifier: Union[str, Identifier],
+        schema: Union[Schema, "pa.Schema"],
+        location: Optional[str] = None,
+        partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
+        sort_order: SortOrder = UNSORTED_SORT_ORDER,
+        properties: Properties = EMPTY_DICT,
+    ) -> Table:
+        namespace, table_name = 
self._validate_database_and_table_identifier(identifier)
+
+        schema: Schema = self._convert_schema_if_needed(schema)  # type: ignore
+
+        # creating a new table with S3 Tables is a two step process. We first 
have to create an S3 Table with the
+        # S3 Tables API and then write the new metadata.json to the 
warehouseLocation associated with the newly
+        # created S3 Table.
+        try:
+            self.s3tables.create_table(
+                tableBucketARN=self.table_bucket_arn, namespace=namespace, 
name=table_name, format="ICEBERG"
+            )
+        except self.s3tables.exceptions.NotFoundException as e:
+            raise NoSuchNamespaceError(f"Cannot create 
{namespace}.{table_name} because no such namespace exists.") from e
+        except self.s3tables.exceptions.ConflictException as e:
+            raise TableAlreadyExistsError(
+                f"Cannot create {namespace}.{table_name} because a table of 
the same name already exists in the namespace."
+            ) from e
+
+        response = 
self.s3tables.get_table(tableBucketARN=self.table_bucket_arn, 
namespace=namespace, name=table_name)
+        version_token = response["versionToken"]
+
+        warehouse_location = response["warehouseLocation"]
+        metadata_location = 
self._get_metadata_location(location=warehouse_location)
+        metadata = new_table_metadata(
+            location=warehouse_location,
+            schema=schema,
+            partition_spec=partition_spec,
+            sort_order=sort_order,
+            properties=properties,
+        )

Review Comment:
   I think the `warehouse_location` should be the default location if user does 
not specify the table location through the `location` parameter, but the 
current implementation does not handle the `location` parameter



##########
pyiceberg/catalog/s3tables.py:
##########
@@ -0,0 +1,324 @@
+import re
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union
+
+import boto3
+
+from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, 
PropertiesUpdateSummary
+from pyiceberg.exceptions import (
+    CommitFailedException,
+    InvalidNamespaceName,
+    InvalidTableName,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    S3TablesError,
+    TableAlreadyExistsError,
+    TableBucketNotFound,
+)
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN, load_file_io
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.table.update import TableRequirement, TableUpdate
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+from pyiceberg.utils.properties import get_first_property_value
+
+if TYPE_CHECKING:
+    import pyarrow as pa
+
+S3TABLES_PROFILE_NAME = "s3tables.profile-name"
+S3TABLES_REGION = "s3tables.region"
+S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id"
+S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key"
+S3TABLES_SESSION_TOKEN = "s3tables.session-token"
+
+S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn"
+
+S3TABLES_ENDPOINT = "s3tables.endpoint"
+
+# for naming rules see: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html
+S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}")
+S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata"
+
+
+class S3TableCatalog(MetastoreCatalog):

Review Comment:
   ```suggestion
   class S3TablesCatalog(MetastoreCatalog):
   ```
   Shall we make it align with the service name "s3tables"? AWS labs also uses 
["S3TablesCatalog"](https://github.com/awslabs/s3-tables-catalog/blob/main/src/software/amazon/s3tables/iceberg/S3TablesCatalog.java)



##########
pyiceberg/catalog/s3tables.py:
##########
@@ -0,0 +1,322 @@
+import re
+from typing import TYPE_CHECKING, List, Optional, Set, Tuple, Union
+
+import boto3
+
+from pyiceberg.catalog import DEPRECATED_BOTOCORE_SESSION, MetastoreCatalog, 
PropertiesUpdateSummary
+from pyiceberg.exceptions import (
+    CommitFailedException,
+    InvalidNamespaceName,
+    InvalidTableName,
+    NamespaceNotEmptyError,
+    NoSuchNamespaceError,
+    NoSuchTableError,
+    S3TablesError,
+    TableAlreadyExistsError,
+    TableBucketNotFound,
+)
+from pyiceberg.io import AWS_ACCESS_KEY_ID, AWS_REGION, AWS_SECRET_ACCESS_KEY, 
AWS_SESSION_TOKEN, load_file_io
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.schema import Schema
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.table import CommitTableResponse, Table
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from pyiceberg.table.update import TableRequirement, TableUpdate
+from pyiceberg.typedef import EMPTY_DICT, Identifier, Properties
+from pyiceberg.utils.properties import get_first_property_value
+
+if TYPE_CHECKING:
+    import pyarrow as pa
+
+S3TABLES_PROFILE_NAME = "s3tables.profile-name"
+S3TABLES_REGION = "s3tables.region"
+S3TABLES_ACCESS_KEY_ID = "s3tables.access-key-id"
+S3TABLES_SECRET_ACCESS_KEY = "s3tables.secret-access-key"
+S3TABLES_SESSION_TOKEN = "s3tables.session-token"
+
+S3TABLES_TABLE_BUCKET_ARN = "s3tables.table-bucket-arn"
+
+S3TABLES_ENDPOINT = "s3tables.endpoint"
+
+# for naming rules see: 
https://docs.aws.amazon.com/AmazonS3/latest/userguide/s3-tables-buckets-naming.html
+S3TABLES_VALID_NAME_REGEX = pattern = re.compile("[a-z0-9][a-z0-9_]{2,62}")
+S3TABLES_RESERVED_NAMESPACE = "aws_s3_metadata"
+
+
+class S3TableCatalog(MetastoreCatalog):
+    def __init__(self, name: str, **properties: str):
+        super().__init__(name, **properties)
+
+        self.table_bucket_arn = self.properties[S3TABLES_TABLE_BUCKET_ARN]
+
+        session = boto3.Session(
+            profile_name=properties.get(S3TABLES_PROFILE_NAME),
+            region_name=get_first_property_value(properties, S3TABLES_REGION, 
AWS_REGION),
+            botocore_session=properties.get(DEPRECATED_BOTOCORE_SESSION),
+            aws_access_key_id=get_first_property_value(properties, 
S3TABLES_ACCESS_KEY_ID, AWS_ACCESS_KEY_ID),
+            aws_secret_access_key=get_first_property_value(properties, 
S3TABLES_SECRET_ACCESS_KEY, AWS_SECRET_ACCESS_KEY),
+            aws_session_token=get_first_property_value(properties, 
S3TABLES_SESSION_TOKEN, AWS_SESSION_TOKEN),
+        )
+        try:
+            self.s3tables = session.client("s3tables", 
endpoint_url=properties.get(S3TABLES_ENDPOINT))
+        except boto3.session.UnknownServiceError as e:
+            raise S3TablesError("'s3tables' requires boto3>=1.35.74. Current 
version: {boto3.__version__}.") from e

Review Comment:
   My two cents: I like this check. I do not think we should enforce dependency 
version upgrade for adding "optional" new service like s3tables, as it may 
reduce compatibility and cause additional version conflict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to