Fokko commented on code in PR #289: URL: https://github.com/apache/iceberg-python/pull/289#discussion_r1469989182
########## pyiceberg/catalog/__init__.py: ########## @@ -137,12 +138,19 @@ def load_sql(name: str, conf: Properties) -> Catalog: raise NotInstalledError("SQLAlchemy support not installed: pip install 'pyiceberg[sql-postgres]'") from exc +def load_memory(name: str, conf: Properties) -> Catalog: + from pyiceberg.catalog.in_memory import InMemoryCatalog + + return InMemoryCatalog(name, **conf) + + AVAILABLE_CATALOGS: dict[CatalogType, Callable[[str, Properties], Catalog]] = { CatalogType.REST: load_rest, CatalogType.HIVE: load_hive, CatalogType.GLUE: load_glue, CatalogType.DYNAMODB: load_dynamodb, CatalogType.SQL: load_sql, + CatalogType.MEMORY: load_memory, Review Comment: Can you also add this one to the docs: https://py.iceberg.apache.org/configuration/ With a warning that this is just for testing purposes only. ########## pyiceberg/catalog/in_memory.py: ########## @@ -0,0 +1,222 @@ +import uuid +from typing import ( + Dict, + List, + Optional, + Set, + Union, +) + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import ( + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + TableAlreadyExistsError, +) +from pyiceberg.io import WAREHOUSE +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import ( + CommitTableRequest, + CommitTableResponse, + Table, + update_table_metadata, +) +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" + + +class InMemoryCatalog(Catalog): + """An in-memory catalog implementation.""" + + __tables: Dict[Identifier, Table] + __namespaces: Dict[Identifier, Properties] + + def __init__(self, name: str, **properties: str) -> None: + super().__init__(name, **properties) + self.__tables = {} + self.__namespaces = {} + self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + table_uuid: Optional[uuid.UUID] = None, + ) -> Table: + identifier = Catalog.identifier_to_tuple(identifier) + namespace = Catalog.namespace_from(identifier) + + if identifier in self.__tables: + raise TableAlreadyExistsError(f"Table already exists: {identifier}") + else: + if namespace not in self.__namespaces: + self.__namespaces[namespace] = {} + + if not location: + location = f'{self._warehouse_location}/{"/".join(identifier)}' + + metadata_location = f'{self._warehouse_location}/{"/".join(identifier)}/metadata/metadata.json' Review Comment: It looks like we don't write the metadata here, but we write it below at the `_commit` method ########## pyiceberg/catalog/in_memory.py: ########## @@ -0,0 +1,222 @@ +import uuid +from typing import ( + Dict, + List, + Optional, + Set, + Union, +) + +from pyiceberg.catalog import ( + Catalog, + Identifier, + Properties, + PropertiesUpdateSummary, +) +from pyiceberg.exceptions import ( + NamespaceAlreadyExistsError, + NamespaceNotEmptyError, + NoSuchNamespaceError, + NoSuchTableError, + TableAlreadyExistsError, +) +from pyiceberg.io import WAREHOUSE +from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec +from pyiceberg.schema import Schema +from pyiceberg.table import ( + CommitTableRequest, + CommitTableResponse, + Table, + update_table_metadata, +) +from pyiceberg.table.metadata import new_table_metadata +from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder +from pyiceberg.typedef import EMPTY_DICT + +DEFAULT_WAREHOUSE_LOCATION = "file:///tmp/warehouse" + + +class InMemoryCatalog(Catalog): + """An in-memory catalog implementation.""" + + __tables: Dict[Identifier, Table] + __namespaces: Dict[Identifier, Properties] + + def __init__(self, name: str, **properties: str) -> None: + super().__init__(name, **properties) + self.__tables = {} + self.__namespaces = {} + self._warehouse_location = properties.get(WAREHOUSE, None) or DEFAULT_WAREHOUSE_LOCATION + + def create_table( + self, + identifier: Union[str, Identifier], + schema: Schema, + location: Optional[str] = None, + partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC, + sort_order: SortOrder = UNSORTED_SORT_ORDER, + properties: Properties = EMPTY_DICT, + table_uuid: Optional[uuid.UUID] = None, + ) -> Table: + identifier = Catalog.identifier_to_tuple(identifier) + namespace = Catalog.namespace_from(identifier) + + if identifier in self.__tables: + raise TableAlreadyExistsError(f"Table already exists: {identifier}") + else: + if namespace not in self.__namespaces: Review Comment: Other implementations don't auto-create namespaces, however I think it is fine for the InMemory one. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org