Fokko commented on code in PR #7921:
URL: https://github.com/apache/iceberg/pull/7921#discussion_r1243206801
##########
python/pyiceberg/catalog/jdbc.py:
##########
@@ -0,0 +1,475 @@
+from pyiceberg.catalog import Catalog
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+from pyiceberg.catalog import (
+ ICEBERG,
+ METADATA_LOCATION,
+ PREVIOUS_METADATA_LOCATION,
+ TABLE_TYPE,
+ Catalog,
+ Identifier,
+ Properties,
+ PropertiesUpdateSummary,
+)
+from pyiceberg.exceptions import (
+ ConditionalCheckFailedException,
+ GenericDynamoDbError,
+ NamespaceAlreadyExistsError,
+ NamespaceNotEmptyError,
+ NoSuchIcebergTableError,
+ NoSuchNamespaceError,
+ NoSuchPropertyException,
+ NoSuchTableError,
+ TableAlreadyExistsError,
+)
+from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.typedef import EMPTY_DICT
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+import psycopg2 as db
+from psycopg2 import Error
+from psycopg2.extras import DictCursor
+import sqlite3
+from urllib.parse import urlparse
+
+JDBC_URI = "uri"
+
+# Catalog tables
+CATALOG_TABLE_NAME = "iceberg_tables"
+CATALOG_NAME = "catalog_name"
+TABLE_NAMESPACE = "table_namespace"
+TABLE_NAME = "table_name"
+METADATA_LOCATION = "metadata_location"
+PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
+
+# Catalog SQL statements
+CREATE_CATALOG_TABLE = f"CREATE TABLE {CATALOG_TABLE_NAME} ({CATALOG_NAME}
VARCHAR(255) NOT NULL, {TABLE_NAMESPACE} VARCHAR(255) NOT NULL, {TABLE_NAME}
VARCHAR(255) NOT NULL, {METADATA_LOCATION} VARCHAR(1000),
{PREVIOUS_METADATA_LOCATION} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{TABLE_NAMESPACE}, {TABLE_NAME}))"
+LIST_TABLES_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} =
%s AND {TABLE_NAMESPACE} = %s"
+GET_TABLE_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s"
+DROP_TABLE_SQL = f"DELETE FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s "
+DO_COMMIT_CREATE_TABLE_SQL = f"INSERT INTO {CATALOG_TABLE_NAME}
({CATALOG_NAME}, {TABLE_NAMESPACE} , {TABLE_NAME} , {METADATA_LOCATION},
{PREVIOUS_METADATA_LOCATION}) VALUES (%s,%s,%s,%s,null)"
+RENAME_TABLE_SQL = f"UPDATE {CATALOG_TABLE_NAME} SET {TABLE_NAMESPACE} = %s,
{TABLE_NAME} = %s WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s AND
{TABLE_NAME} = %s "
+
+GET_NAMESPACE_SQL = f"SELECT {TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} WHERE
{CATALOG_NAME} = %s AND {TABLE_NAMESPACE} LIKE %s LIMIT 1"
+LIST_ALL_TABLE_NAMESPACES_SQL = f"SELECT DISTINCT {TABLE_NAMESPACE} FROM
{CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+
+# Catalog Namespace Properties
+NAMESPACE_EXISTS_PROPERTY = "exists"
+NAMESPACE_MINIMAL_PROPERTIES = {NAMESPACE_EXISTS_PROPERTY: "true"}
+NAMESPACE_PROPERTIES_TABLE_NAME = "iceberg_namespace_properties"
+NAMESPACE_NAME = "namespace"
+NAMESPACE_PROPERTY_KEY = "property_key"
+NAMESPACE_PROPERTY_VALUE = "property_value"
+
+# Catalog Namespace SQL statements
+CREATE_NAMESPACE_PROPERTIES_TABLE = f"CREATE TABLE
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME} VARCHAR(255) NOT NULL,
{NAMESPACE_NAME} VARCHAR(255) NOT NULL, {NAMESPACE_PROPERTY_KEY} VARCHAR(255),
{NAMESPACE_PROPERTY_VALUE} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))"
+INSERT_NAMESPACE_PROPERTIES_SQL = f"INSERT INTO
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME}, {NAMESPACE_NAME},
{NAMESPACE_PROPERTY_KEY}, {NAMESPACE_PROPERTY_VALUE}) VALUES "
+INSERT_PROPERTIES_VALUES_BASE = f"(%s,%s,%s,%s)"
+LIST_ALL_PROPERTY_NAMESPACES_SQL = f"SELECT DISTINCT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+DELETE_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+DELETE_ALL_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+UPDATE_NAMESPACE_PROPERTIES_START_SQL = f"UPDATE
{NAMESPACE_PROPERTIES_TABLE_NAME} SET {NAMESPACE_PROPERTY_VALUE} = CASE"
+UPDATE_NAMESPACE_PROPERTIES_END_SQL = f" END WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+
+
+GET_NAMESPACE_PROPERTIES_SQL = f"SELECT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} LIKE %s LIMIT 1"
+GET_ALL_NAMESPACE_PROPERTIES_SQL = f"SELECT * FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+
+# Custom SQL not from JDBCCatalog.java
+LIST_ALL_NAMESPACES_SQL = f"""
+SELECT DISTINCT ns FROM
+(
+ SELECT {TABLE_NAMESPACE} AS ns FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_NAME} = %s
+ UNION
Review Comment:
I think this is already deduplicated because it is an `UNION` and not a
`UNION ALL`. This would eliminate the `SELECT DISTINCT`
##########
python/pyiceberg/catalog/jdbc.py:
##########
@@ -0,0 +1,475 @@
+from pyiceberg.catalog import Catalog
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+from pyiceberg.catalog import (
+ ICEBERG,
+ METADATA_LOCATION,
+ PREVIOUS_METADATA_LOCATION,
+ TABLE_TYPE,
+ Catalog,
+ Identifier,
+ Properties,
+ PropertiesUpdateSummary,
+)
+from pyiceberg.exceptions import (
+ ConditionalCheckFailedException,
+ GenericDynamoDbError,
+ NamespaceAlreadyExistsError,
+ NamespaceNotEmptyError,
+ NoSuchIcebergTableError,
+ NoSuchNamespaceError,
+ NoSuchPropertyException,
+ NoSuchTableError,
+ TableAlreadyExistsError,
+)
+from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.typedef import EMPTY_DICT
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+import psycopg2 as db
+from psycopg2 import Error
+from psycopg2.extras import DictCursor
+import sqlite3
+from urllib.parse import urlparse
+
+JDBC_URI = "uri"
+
+# Catalog tables
+CATALOG_TABLE_NAME = "iceberg_tables"
+CATALOG_NAME = "catalog_name"
+TABLE_NAMESPACE = "table_namespace"
+TABLE_NAME = "table_name"
+METADATA_LOCATION = "metadata_location"
+PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
+
+# Catalog SQL statements
+CREATE_CATALOG_TABLE = f"CREATE TABLE {CATALOG_TABLE_NAME} ({CATALOG_NAME}
VARCHAR(255) NOT NULL, {TABLE_NAMESPACE} VARCHAR(255) NOT NULL, {TABLE_NAME}
VARCHAR(255) NOT NULL, {METADATA_LOCATION} VARCHAR(1000),
{PREVIOUS_METADATA_LOCATION} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{TABLE_NAMESPACE}, {TABLE_NAME}))"
+LIST_TABLES_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} =
%s AND {TABLE_NAMESPACE} = %s"
+GET_TABLE_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s"
+DROP_TABLE_SQL = f"DELETE FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s "
+DO_COMMIT_CREATE_TABLE_SQL = f"INSERT INTO {CATALOG_TABLE_NAME}
({CATALOG_NAME}, {TABLE_NAMESPACE} , {TABLE_NAME} , {METADATA_LOCATION},
{PREVIOUS_METADATA_LOCATION}) VALUES (%s,%s,%s,%s,null)"
+RENAME_TABLE_SQL = f"UPDATE {CATALOG_TABLE_NAME} SET {TABLE_NAMESPACE} = %s,
{TABLE_NAME} = %s WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s AND
{TABLE_NAME} = %s "
+
+GET_NAMESPACE_SQL = f"SELECT {TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} WHERE
{CATALOG_NAME} = %s AND {TABLE_NAMESPACE} LIKE %s LIMIT 1"
+LIST_ALL_TABLE_NAMESPACES_SQL = f"SELECT DISTINCT {TABLE_NAMESPACE} FROM
{CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+
+# Catalog Namespace Properties
+NAMESPACE_EXISTS_PROPERTY = "exists"
+NAMESPACE_MINIMAL_PROPERTIES = {NAMESPACE_EXISTS_PROPERTY: "true"}
+NAMESPACE_PROPERTIES_TABLE_NAME = "iceberg_namespace_properties"
+NAMESPACE_NAME = "namespace"
+NAMESPACE_PROPERTY_KEY = "property_key"
+NAMESPACE_PROPERTY_VALUE = "property_value"
+
+# Catalog Namespace SQL statements
+CREATE_NAMESPACE_PROPERTIES_TABLE = f"CREATE TABLE
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME} VARCHAR(255) NOT NULL,
{NAMESPACE_NAME} VARCHAR(255) NOT NULL, {NAMESPACE_PROPERTY_KEY} VARCHAR(255),
{NAMESPACE_PROPERTY_VALUE} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))"
+INSERT_NAMESPACE_PROPERTIES_SQL = f"INSERT INTO
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME}, {NAMESPACE_NAME},
{NAMESPACE_PROPERTY_KEY}, {NAMESPACE_PROPERTY_VALUE}) VALUES "
+INSERT_PROPERTIES_VALUES_BASE = f"(%s,%s,%s,%s)"
+LIST_ALL_PROPERTY_NAMESPACES_SQL = f"SELECT DISTINCT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+DELETE_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+DELETE_ALL_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+UPDATE_NAMESPACE_PROPERTIES_START_SQL = f"UPDATE
{NAMESPACE_PROPERTIES_TABLE_NAME} SET {NAMESPACE_PROPERTY_VALUE} = CASE"
+UPDATE_NAMESPACE_PROPERTIES_END_SQL = f" END WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+
+
+GET_NAMESPACE_PROPERTIES_SQL = f"SELECT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} LIKE %s LIMIT 1"
+GET_ALL_NAMESPACE_PROPERTIES_SQL = f"SELECT * FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+
+# Custom SQL not from JDBCCatalog.java
+LIST_ALL_NAMESPACES_SQL = f"""
+SELECT DISTINCT ns FROM
+(
+ SELECT {TABLE_NAMESPACE} AS ns FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_NAME} = %s
+ UNION
+ SELECT {NAMESPACE_NAME} AS ns FROM {NAMESPACE_PROPERTIES_TABLE_NAME}
+ WHERE {CATALOG_NAME} = %s
+) AS all_catalog_namespaces
+"""
+
+def _sqlite(**properties: str) -> Any:
+ parsed_uri = urlparse(properties.get("uri"))
+ return sqlite3.connect(database=parsed_uri.path, uri=False)
+
+def _postgresql(**properties: str) -> Any:
+ parsed_uri = urlparse(properties.get("uri"))
+ postgresql_props = {
+ "user": parsed_uri.username,
+ "password": parsed_uri.password,
+ "dbname": parsed_uri.path[1:],
+ "host": parsed_uri.hostname,
+ "port": parsed_uri.port,
+ }
+
+ return db.connect(**postgresql_props)
+
+SCHEME_TO_DB = {
+ "file": _sqlite,
+ "postgresql": _postgresql,
+}
+
+class JDBCCatalog(Catalog):
+ def __init__(self, name: str, **properties: str):
+ super().__init__(name, **properties)
+
+ # Get a database connection for a specific scheme.
+ uri = urlparse(self.properties.get("uri"))
Review Comment:
Should we add a check to make sure that `uri` is set:
```python
Python 3.11.4 (main, Jun 20 2023, 17:23:00) [Clang 14.0.3
(clang-1403.0.22.14.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> from urllib.parse import urlparse
>>> r = urlparse(None)
>>> r
ParseResultBytes(scheme=b'', netloc=b'', path=b'', params=b'', query=b'',
fragment=b'')
>>> str(r.scheme)
"b''"
>>> uri_scheme = str(r.scheme)
>>> raise ValueError(f"No registered database for scheme: {uri_scheme}")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
ValueError: No registered database for scheme: b''
```
From this error, it is not directly obvious that `uri` isn't set
##########
python/pyiceberg/catalog/jdbc.py:
##########
@@ -0,0 +1,475 @@
+from pyiceberg.catalog import Catalog
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+from pyiceberg.catalog import (
+ ICEBERG,
+ METADATA_LOCATION,
+ PREVIOUS_METADATA_LOCATION,
+ TABLE_TYPE,
+ Catalog,
+ Identifier,
+ Properties,
+ PropertiesUpdateSummary,
+)
+from pyiceberg.exceptions import (
+ ConditionalCheckFailedException,
+ GenericDynamoDbError,
+ NamespaceAlreadyExistsError,
+ NamespaceNotEmptyError,
+ NoSuchIcebergTableError,
+ NoSuchNamespaceError,
+ NoSuchPropertyException,
+ NoSuchTableError,
+ TableAlreadyExistsError,
+)
+from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.typedef import EMPTY_DICT
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+import psycopg2 as db
+from psycopg2 import Error
+from psycopg2.extras import DictCursor
+import sqlite3
+from urllib.parse import urlparse
+
+JDBC_URI = "uri"
+
+# Catalog tables
+CATALOG_TABLE_NAME = "iceberg_tables"
+CATALOG_NAME = "catalog_name"
+TABLE_NAMESPACE = "table_namespace"
+TABLE_NAME = "table_name"
+METADATA_LOCATION = "metadata_location"
+PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
+
+# Catalog SQL statements
+CREATE_CATALOG_TABLE = f"CREATE TABLE {CATALOG_TABLE_NAME} ({CATALOG_NAME}
VARCHAR(255) NOT NULL, {TABLE_NAMESPACE} VARCHAR(255) NOT NULL, {TABLE_NAME}
VARCHAR(255) NOT NULL, {METADATA_LOCATION} VARCHAR(1000),
{PREVIOUS_METADATA_LOCATION} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{TABLE_NAMESPACE}, {TABLE_NAME}))"
+LIST_TABLES_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} =
%s AND {TABLE_NAMESPACE} = %s"
+GET_TABLE_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s"
+DROP_TABLE_SQL = f"DELETE FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s "
+DO_COMMIT_CREATE_TABLE_SQL = f"INSERT INTO {CATALOG_TABLE_NAME}
({CATALOG_NAME}, {TABLE_NAMESPACE} , {TABLE_NAME} , {METADATA_LOCATION},
{PREVIOUS_METADATA_LOCATION}) VALUES (%s,%s,%s,%s,null)"
+RENAME_TABLE_SQL = f"UPDATE {CATALOG_TABLE_NAME} SET {TABLE_NAMESPACE} = %s,
{TABLE_NAME} = %s WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s AND
{TABLE_NAME} = %s "
+
+GET_NAMESPACE_SQL = f"SELECT {TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} WHERE
{CATALOG_NAME} = %s AND {TABLE_NAMESPACE} LIKE %s LIMIT 1"
+LIST_ALL_TABLE_NAMESPACES_SQL = f"SELECT DISTINCT {TABLE_NAMESPACE} FROM
{CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+
+# Catalog Namespace Properties
+NAMESPACE_EXISTS_PROPERTY = "exists"
+NAMESPACE_MINIMAL_PROPERTIES = {NAMESPACE_EXISTS_PROPERTY: "true"}
+NAMESPACE_PROPERTIES_TABLE_NAME = "iceberg_namespace_properties"
+NAMESPACE_NAME = "namespace"
+NAMESPACE_PROPERTY_KEY = "property_key"
+NAMESPACE_PROPERTY_VALUE = "property_value"
+
+# Catalog Namespace SQL statements
+CREATE_NAMESPACE_PROPERTIES_TABLE = f"CREATE TABLE
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME} VARCHAR(255) NOT NULL,
{NAMESPACE_NAME} VARCHAR(255) NOT NULL, {NAMESPACE_PROPERTY_KEY} VARCHAR(255),
{NAMESPACE_PROPERTY_VALUE} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))"
+INSERT_NAMESPACE_PROPERTIES_SQL = f"INSERT INTO
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME}, {NAMESPACE_NAME},
{NAMESPACE_PROPERTY_KEY}, {NAMESPACE_PROPERTY_VALUE}) VALUES "
+INSERT_PROPERTIES_VALUES_BASE = f"(%s,%s,%s,%s)"
+LIST_ALL_PROPERTY_NAMESPACES_SQL = f"SELECT DISTINCT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+DELETE_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+DELETE_ALL_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+UPDATE_NAMESPACE_PROPERTIES_START_SQL = f"UPDATE
{NAMESPACE_PROPERTIES_TABLE_NAME} SET {NAMESPACE_PROPERTY_VALUE} = CASE"
+UPDATE_NAMESPACE_PROPERTIES_END_SQL = f" END WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+
+
+GET_NAMESPACE_PROPERTIES_SQL = f"SELECT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} LIKE %s LIMIT 1"
+GET_ALL_NAMESPACE_PROPERTIES_SQL = f"SELECT * FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+
+# Custom SQL not from JDBCCatalog.java
+LIST_ALL_NAMESPACES_SQL = f"""
+SELECT DISTINCT ns FROM
+(
+ SELECT {TABLE_NAMESPACE} AS ns FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_NAME} = %s
+ UNION
+ SELECT {NAMESPACE_NAME} AS ns FROM {NAMESPACE_PROPERTIES_TABLE_NAME}
+ WHERE {CATALOG_NAME} = %s
+) AS all_catalog_namespaces
+"""
+
+def _sqlite(**properties: str) -> Any:
+ parsed_uri = urlparse(properties.get("uri"))
+ return sqlite3.connect(database=parsed_uri.path, uri=False)
+
+def _postgresql(**properties: str) -> Any:
+ parsed_uri = urlparse(properties.get("uri"))
+ postgresql_props = {
+ "user": parsed_uri.username,
+ "password": parsed_uri.password,
+ "dbname": parsed_uri.path[1:],
+ "host": parsed_uri.hostname,
+ "port": parsed_uri.port,
+ }
+
+ return db.connect(**postgresql_props)
+
+SCHEME_TO_DB = {
+ "file": _sqlite,
+ "postgresql": _postgresql,
+}
+
+class JDBCCatalog(Catalog):
+ def __init__(self, name: str, **properties: str):
+ super().__init__(name, **properties)
+
+ # Get a database connection for a specific scheme.
+ uri = urlparse(self.properties.get("uri"))
+ uri_scheme = str(uri.scheme)
+ if uri_scheme not in SCHEME_TO_DB:
+ raise ValueError(f"No registered database for scheme:
{uri_scheme}")
+ self._get_db_connection = SCHEME_TO_DB[uri_scheme]
+
+ def initialize_catalog_tables(self) -> None:
+ with self._get_db_connection(**self.properties) as conn:
+ try:
+ curs = conn.cursor()
+ curs.execute(CREATE_CATALOG_TABLE)
+ curs.execute(CREATE_NAMESPACE_PROPERTIES_TABLE)
+ conn.commit()
+ finally:
+ curs.close()
+
+
+ def _convert_jdbc_to_iceberg(self, jdbc_row: Dict[str, str]) -> Table:
+ # properties: Dict[str, str] = table.parameters
+ # if TABLE_TYPE not in properties:
+ # raise NoSuchTableError(f"Property table_type missing, could not
determine type: {table.dbName}.{table.tableName}")
+
+ # table_type = properties[TABLE_TYPE]
+ # if table_type.lower() != ICEBERG:
+ # raise NoSuchIcebergTableError(
+ # f"Property table_type is {table_type}, expected {ICEBERG}:
{table.dbName}.{table.tableName}"
+ # )
+
+ # if prop_metadata_location := properties.get(METADATA_LOCATION):
+ # metadata_location = prop_metadata_location
+ # else:
+ # raise NoSuchTableError(f"Table property {METADATA_LOCATION} is
missing")
+ metadata_location = jdbc_row[METADATA_LOCATION]
Review Comment:
Probably we want to have a better error message here as commented out on the
line above
##########
python/pyiceberg/catalog/jdbc.py:
##########
@@ -0,0 +1,475 @@
+from pyiceberg.catalog import Catalog
+from pyiceberg.schema import Schema
+from pyiceberg.table import Table
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+from pyiceberg.catalog import (
+ ICEBERG,
+ METADATA_LOCATION,
+ PREVIOUS_METADATA_LOCATION,
+ TABLE_TYPE,
+ Catalog,
+ Identifier,
+ Properties,
+ PropertiesUpdateSummary,
+)
+from pyiceberg.exceptions import (
+ ConditionalCheckFailedException,
+ GenericDynamoDbError,
+ NamespaceAlreadyExistsError,
+ NamespaceNotEmptyError,
+ NoSuchIcebergTableError,
+ NoSuchNamespaceError,
+ NoSuchPropertyException,
+ NoSuchTableError,
+ TableAlreadyExistsError,
+)
+from pyiceberg.io import FileIO, load_file_io
+from pyiceberg.serializers import FromInputFile
+from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
+from pyiceberg.typedef import EMPTY_DICT
+from pyiceberg.table.metadata import new_table_metadata
+from pyiceberg.table.sorting import UNSORTED_SORT_ORDER, SortOrder
+from typing import (
+ Any,
+ Dict,
+ List,
+ Optional,
+ Set,
+ Union,
+)
+import psycopg2 as db
+from psycopg2 import Error
+from psycopg2.extras import DictCursor
+import sqlite3
+from urllib.parse import urlparse
+
+JDBC_URI = "uri"
+
+# Catalog tables
+CATALOG_TABLE_NAME = "iceberg_tables"
+CATALOG_NAME = "catalog_name"
+TABLE_NAMESPACE = "table_namespace"
+TABLE_NAME = "table_name"
+METADATA_LOCATION = "metadata_location"
+PREVIOUS_METADATA_LOCATION = "previous_metadata_location"
+
+# Catalog SQL statements
+CREATE_CATALOG_TABLE = f"CREATE TABLE {CATALOG_TABLE_NAME} ({CATALOG_NAME}
VARCHAR(255) NOT NULL, {TABLE_NAMESPACE} VARCHAR(255) NOT NULL, {TABLE_NAME}
VARCHAR(255) NOT NULL, {METADATA_LOCATION} VARCHAR(1000),
{PREVIOUS_METADATA_LOCATION} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{TABLE_NAMESPACE}, {TABLE_NAME}))"
+LIST_TABLES_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} =
%s AND {TABLE_NAMESPACE} = %s"
+GET_TABLE_SQL = f"SELECT * FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s"
+DROP_TABLE_SQL = f"DELETE FROM {CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s
AND {TABLE_NAMESPACE} = %s AND {TABLE_NAME} = %s "
+DO_COMMIT_CREATE_TABLE_SQL = f"INSERT INTO {CATALOG_TABLE_NAME}
({CATALOG_NAME}, {TABLE_NAMESPACE} , {TABLE_NAME} , {METADATA_LOCATION},
{PREVIOUS_METADATA_LOCATION}) VALUES (%s,%s,%s,%s,null)"
+RENAME_TABLE_SQL = f"UPDATE {CATALOG_TABLE_NAME} SET {TABLE_NAMESPACE} = %s,
{TABLE_NAME} = %s WHERE {CATALOG_NAME} = %s AND {TABLE_NAMESPACE} = %s AND
{TABLE_NAME} = %s "
+
+GET_NAMESPACE_SQL = f"SELECT {TABLE_NAMESPACE} FROM {CATALOG_TABLE_NAME} WHERE
{CATALOG_NAME} = %s AND {TABLE_NAMESPACE} LIKE %s LIMIT 1"
+LIST_ALL_TABLE_NAMESPACES_SQL = f"SELECT DISTINCT {TABLE_NAMESPACE} FROM
{CATALOG_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+
+# Catalog Namespace Properties
+NAMESPACE_EXISTS_PROPERTY = "exists"
+NAMESPACE_MINIMAL_PROPERTIES = {NAMESPACE_EXISTS_PROPERTY: "true"}
+NAMESPACE_PROPERTIES_TABLE_NAME = "iceberg_namespace_properties"
+NAMESPACE_NAME = "namespace"
+NAMESPACE_PROPERTY_KEY = "property_key"
+NAMESPACE_PROPERTY_VALUE = "property_value"
+
+# Catalog Namespace SQL statements
+CREATE_NAMESPACE_PROPERTIES_TABLE = f"CREATE TABLE
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME} VARCHAR(255) NOT NULL,
{NAMESPACE_NAME} VARCHAR(255) NOT NULL, {NAMESPACE_PROPERTY_KEY} VARCHAR(255),
{NAMESPACE_PROPERTY_VALUE} VARCHAR(1000), PRIMARY KEY ({CATALOG_NAME},
{NAMESPACE_NAME}, {NAMESPACE_PROPERTY_KEY}))"
+INSERT_NAMESPACE_PROPERTIES_SQL = f"INSERT INTO
{NAMESPACE_PROPERTIES_TABLE_NAME} ({CATALOG_NAME}, {NAMESPACE_NAME},
{NAMESPACE_PROPERTY_KEY}, {NAMESPACE_PROPERTY_VALUE}) VALUES "
+INSERT_PROPERTIES_VALUES_BASE = f"(%s,%s,%s,%s)"
+LIST_ALL_PROPERTY_NAMESPACES_SQL = f"SELECT DISTINCT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s"
+DELETE_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+DELETE_ALL_NAMESPACE_PROPERTIES_SQL = f"DELETE FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+UPDATE_NAMESPACE_PROPERTIES_START_SQL = f"UPDATE
{NAMESPACE_PROPERTIES_TABLE_NAME} SET {NAMESPACE_PROPERTY_VALUE} = CASE"
+UPDATE_NAMESPACE_PROPERTIES_END_SQL = f" END WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s AND {NAMESPACE_PROPERTY_KEY} IN "
+
+
+GET_NAMESPACE_PROPERTIES_SQL = f"SELECT {NAMESPACE_NAME} FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} LIKE %s LIMIT 1"
+GET_ALL_NAMESPACE_PROPERTIES_SQL = f"SELECT * FROM
{NAMESPACE_PROPERTIES_TABLE_NAME} WHERE {CATALOG_NAME} = %s AND
{NAMESPACE_NAME} = %s"
+
+# Custom SQL not from JDBCCatalog.java
+LIST_ALL_NAMESPACES_SQL = f"""
+SELECT DISTINCT ns FROM
+(
+ SELECT {TABLE_NAMESPACE} AS ns FROM {CATALOG_TABLE_NAME}
+ WHERE {CATALOG_NAME} = %s
+ UNION
+ SELECT {NAMESPACE_NAME} AS ns FROM {NAMESPACE_PROPERTIES_TABLE_NAME}
+ WHERE {CATALOG_NAME} = %s
+) AS all_catalog_namespaces
+"""
+
+def _sqlite(**properties: str) -> Any:
+ parsed_uri = urlparse(properties.get("uri"))
+ return sqlite3.connect(database=parsed_uri.path, uri=False)
+
+def _postgresql(**properties: str) -> Any:
+ parsed_uri = urlparse(properties.get("uri"))
+ postgresql_props = {
+ "user": parsed_uri.username,
+ "password": parsed_uri.password,
+ "dbname": parsed_uri.path[1:],
+ "host": parsed_uri.hostname,
+ "port": parsed_uri.port,
+ }
+
+ return db.connect(**postgresql_props)
+
+SCHEME_TO_DB = {
+ "file": _sqlite,
+ "postgresql": _postgresql,
+}
+
+class JDBCCatalog(Catalog):
+ def __init__(self, name: str, **properties: str):
+ super().__init__(name, **properties)
+
+ # Get a database connection for a specific scheme.
+ uri = urlparse(self.properties.get("uri"))
+ uri_scheme = str(uri.scheme)
+ if uri_scheme not in SCHEME_TO_DB:
+ raise ValueError(f"No registered database for scheme:
{uri_scheme}")
+ self._get_db_connection = SCHEME_TO_DB[uri_scheme]
+
+ def initialize_catalog_tables(self) -> None:
+ with self._get_db_connection(**self.properties) as conn:
+ try:
+ curs = conn.cursor()
Review Comment:
I'm a big fan of the context manager. How do you feel about using `with
conn.cursor() as curs:`
##########
python/pyproject.toml:
##########
@@ -61,6 +61,7 @@ thrift = { version = ">=0.13.0,<1.0.0", optional = true }
boto3 = { version = ">=1.17.106", optional = true }
s3fs = { version = ">=2021.08.0,<2024.1.0", optional = true } # Upper bound
set arbitrarily, to be reassessed in early 2024.
adlfs = { version = ">=2021.07.0,<2024.1.0", optional = true } # Upper bound
set arbitrarily, to be reassessed in early 2024.
+psycopg2-binary = "^2.9.6"
Review Comment:
I think we want to make this one optional as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]