Xuanwo commented on code in PR #503:
URL: https://github.com/apache/iceberg-rust/pull/503#discussion_r1702852669


##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,2111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
+use iceberg::table::Table;
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
+use sqlx::{AnyPool, Row};
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+/// namespace `location` property
+const LOCATION: &str = "location";
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+static TABLE_RECORD_TYPE: &str = "TABLE";
+
+static NAMESPACE_PROPERTIES_TABLE_NAME: &str = "iceberg_namespace_properties";
+static NAMESPACE_NAME: &str = "namespace";
+static NAMESPACE_PROPERTY_KEY: &str = "property_key";
+static NAMESPACE_PROPERTY_VALUE: &str = "property_value";
+
+static MAX_CONNECTIONS: u32 = 10;
+static IDLE_TIMEOUT: u64 = 10;
+static TEST_BEFORE_ACQUIRE: bool = true;
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    uri: String,
+    name: String,
+    warehouse_location: Option<String>,
+    file_io: FileIO,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    warehouse_location: Option<String>,
+    fileio: FileIO,
+    backend: DatabaseType,
+}
+
+#[derive(Debug, PartialEq)]
+enum DatabaseType {
+    PostgreSQL,
+    MySQL,
+    SQLite,
+}

Review Comment:
   How about removing `DatabaseType`? We only need to handle the PostgreSQL 
cases, so we can add its name as a constant.
   
   In this way, we don't need to decide which database we want to support.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,2111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
+use iceberg::table::Table;
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
+use sqlx::{AnyPool, Row};
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+/// namespace `location` property
+const LOCATION: &str = "location";
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+static TABLE_RECORD_TYPE: &str = "TABLE";
+
+static NAMESPACE_PROPERTIES_TABLE_NAME: &str = "iceberg_namespace_properties";
+static NAMESPACE_NAME: &str = "namespace";
+static NAMESPACE_PROPERTY_KEY: &str = "property_key";
+static NAMESPACE_PROPERTY_VALUE: &str = "property_value";
+
+static MAX_CONNECTIONS: u32 = 10;
+static IDLE_TIMEOUT: u64 = 10;
+static TEST_BEFORE_ACQUIRE: bool = true;
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    uri: String,
+    name: String,
+    warehouse_location: Option<String>,
+    file_io: FileIO,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    warehouse_location: Option<String>,
+    fileio: FileIO,
+    backend: DatabaseType,
+}
+
+#[derive(Debug, PartialEq)]
+enum DatabaseType {
+    PostgreSQL,
+    MySQL,
+    SQLite,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(MAX_CONNECTIONS);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(IDLE_TIMEOUT);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(TEST_BEFORE_ACQUIRE);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.uri)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        let conn = pool.acquire().await.map_err(from_sqlx_error)?;
+
+        let db_type = match conn.backend_name() {
+            "PostgreSQL" => DatabaseType::PostgreSQL,
+            "MySQL" => DatabaseType::MySQL,
+            "SQLite" => DatabaseType::SQLite,
+            _ => DatabaseType::SQLite,
+        };
+
+        sqlx::query(
+            &format!("create table if not exists {} ({} varchar(255) not null, 
{} varchar(255) not null, {} varchar(255) not null, {} varchar(255), {} 
varchar(255), {} varchar(5), primary key ({}, {}, {}))", 

Review Comment:
   I'm not familiar with Java or Python implementation. Are they the same?
   
   
   Python:
   
   
https://github.com/apache/iceberg-python/blob/6c0d307032608967ccd00cfe72d8815e6e7e01cc/pyiceberg/catalog/sql.py#L80-L98
   
   Java: 
   
   
https://github.com/apache/iceberg/blob/b17d1c9abdb8fbd668ac02194cadd6003c3e37f7/core/src/main/java/org/apache/iceberg/jdbc/JdbcUtil.java#L125-L146



##########
crates/catalog/sql/Cargo.toml:
##########
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-sql"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Rust Sql Catalog"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "sql", "catalog"]
+
+[dependencies]
+async-trait = { workspace = true }
+iceberg = { workspace = true }
+serde_json = { workspace = true }
+sqlx = { version = "0.7.4", features = ["tls-rustls", "any" ], 
default-features = false }
+typed-builder = { workspace = true }
+uuid = { workspace = true, features = ["v4"] }
+
+[dev-dependencies]
+iceberg_test_utils = { path = "../../test_utils", features = ["tests"] }
+itertools = { workspace = true }
+regex = "1.10.5"
+sqlx = { version = "0.7.4", features = ["tls-rustls", "runtime-tokio", "any", 
"sqlite", "migrate"], default-features = false }
+tempfile = { workspace = true }
+tokio = { workspace = true }
+
+[features]
+sqlite = ["sqlx/sqlite"]
+postgres = ["sqlx/postgres"]
+mysql = ["sqlx/mysql"]

Review Comment:
   Hi, I'm considering whether it's possible to avoid providing features 
ourselves. Instead, users can active whether they want by adding `sqlx`  in 
their dependencs try:
   
   ```toml
   iceberg_catalog_sql = "0.x"
   sqlx = { version = "0.7.4", features = ["sqlite", "postgres"] }
   ```
   
   So we only depends on the interface of sqlx. Users can pick up whatever 
driver they want (even those implemented by their own).



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,2111 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::time::Duration;
+
+use async_trait::async_trait;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
+use iceberg::table::Table;
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, 
TableCreation,
+    TableIdent,
+};
+use sqlx::any::{install_default_drivers, AnyPoolOptions, AnyRow};
+use sqlx::{AnyPool, Row};
+use typed_builder::TypedBuilder;
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+/// namespace `location` property
+const LOCATION: &str = "location";
+
+static CATALOG_TABLE_VIEW_NAME: &str = "iceberg_tables";
+static CATALOG_NAME: &str = "catalog_name";
+static TABLE_NAME: &str = "table_name";
+static TABLE_NAMESPACE: &str = "table_namespace";
+static METADATA_LOCATION_PROP: &str = "metadata_location";
+static PREVIOUS_METADATA_LOCATION_PROP: &str = "previous_metadata_location";
+static RECORD_TYPE: &str = "iceberg_type";
+static TABLE_RECORD_TYPE: &str = "TABLE";
+
+static NAMESPACE_PROPERTIES_TABLE_NAME: &str = "iceberg_namespace_properties";
+static NAMESPACE_NAME: &str = "namespace";
+static NAMESPACE_PROPERTY_KEY: &str = "property_key";
+static NAMESPACE_PROPERTY_VALUE: &str = "property_value";
+
+static MAX_CONNECTIONS: u32 = 10;
+static IDLE_TIMEOUT: u64 = 10;
+static TEST_BEFORE_ACQUIRE: bool = true;
+
+/// Sql catalog config
+#[derive(Debug, TypedBuilder)]
+pub struct SqlCatalogConfig {
+    uri: String,
+    name: String,
+    warehouse_location: Option<String>,
+    file_io: FileIO,
+    #[builder(default)]
+    props: HashMap<String, String>,
+}
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: AnyPool,
+    warehouse_location: Option<String>,
+    fileio: FileIO,
+    backend: DatabaseType,
+}
+
+#[derive(Debug, PartialEq)]
+enum DatabaseType {
+    PostgreSQL,
+    MySQL,
+    SQLite,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(config: SqlCatalogConfig) -> Result<Self> {
+        install_default_drivers();
+        let max_connections: u32 = config
+            .props
+            .get("pool.max-connections")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(MAX_CONNECTIONS);
+        let idle_timeout: u64 = config
+            .props
+            .get("pool.idle-timeout")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(IDLE_TIMEOUT);
+        let test_before_acquire: bool = config
+            .props
+            .get("pool.test-before-acquire")
+            .map(|v| v.parse().unwrap())
+            .unwrap_or(TEST_BEFORE_ACQUIRE);
+
+        let pool = AnyPoolOptions::new()
+            .max_connections(max_connections)
+            .idle_timeout(Duration::from_secs(idle_timeout))
+            .test_before_acquire(test_before_acquire)
+            .connect(&config.uri)
+            .await
+            .map_err(from_sqlx_error)?;
+
+        let conn = pool.acquire().await.map_err(from_sqlx_error)?;
+
+        let db_type = match conn.backend_name() {
+            "PostgreSQL" => DatabaseType::PostgreSQL,
+            "MySQL" => DatabaseType::MySQL,
+            "SQLite" => DatabaseType::SQLite,
+            _ => DatabaseType::SQLite,
+        };
+
+        sqlx::query(
+            &format!("create table if not exists {} ({} varchar(255) not null, 
{} varchar(255) not null, {} varchar(255) not null, {} varchar(255), {} 
varchar(255), {} varchar(5), primary key ({}, {}, {}))", 
+            CATALOG_TABLE_VIEW_NAME,
+            CATALOG_NAME,
+            TABLE_NAMESPACE,
+            TABLE_NAME,
+            METADATA_LOCATION_PROP,
+            PREVIOUS_METADATA_LOCATION_PROP,
+            RECORD_TYPE,
+            CATALOG_NAME,
+            TABLE_NAMESPACE,
+            TABLE_NAME),
+        )
+        .execute(&pool)
+        .await
+        .map_err(from_sqlx_error)?;
+
+        sqlx::query(
+            &format!("create table if not exists {} ({} varchar(255) not null, 
{} varchar(255) not null, {} varchar(255), {} varchar(255), primary key ({}, 
{}, {}))",

Review Comment:
   How about format string in SQL way to make them easy to read?
   
   ```rust
   format!("
   create table if not exists {NAMESPACE_PROPERTIES_TABLE_NAME} (
       {CATALOG_NAME} varchar(255) not null,
       ....
       primary key (
           {CATALOG_NAME}, 
           {NAMESPACE_NAME}, 
           {NAMESPACE_PROPERTY_KEY}
       )
   ")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to