liurenjie1024 commented on code in PR #229:
URL: https://github.com/apache/iceberg-rust/pull/229#discussion_r1513742759


##########
crates/catalog/sql/src/lib.rs:
##########
@@ -0,0 +1,24 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Iceberg REST API implementation.

Review Comment:
   ```suggestion
   //! Iceberg sql catalog implementation.
   ```



##########
crates/iceberg/src/spec/table_metadata.rs:
##########
@@ -44,21 +49,27 @@ pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
 /// Reference to [`TableMetadata`].
 pub type TableMetadataRef = Arc<TableMetadata>;
 
-#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
+#[derive(Debug, PartialEq, Deserialize, Eq, Clone, TypedBuilder)]

Review Comment:
   We had a discussion in [this 
pr](https://github.com/apache/iceberg-rust/pull/62) about the table metadata 
builder. I have concern on this derived builder since it's error prone and not 
easy to review. `TableMetadataBuilder` will be heavily used by transaction api 
and we will need to do a lot of check for in it. I would suggest to maintain 
this struct manually, what do you think?



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt};
+use sqlx::{
+    any::{install_default_drivers, AnyConnectOptions, AnyRow},
+    AnyConnection, ConnectOptions, Connection, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO, spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, 
Namespace,
+    NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
+};
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: Arc<Mutex<AnyConnection>>,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(url: &str, name: &str, storage: FileIO) -> Result<Self> {
+        install_default_drivers();
+
+        let mut connection = AnyConnectOptions::connect(
+            
&AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?,
+        )
+        .await
+        .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists iceberg_tables (
+                                catalog_name text not null,
+                                table_namespace text not null,
+                                table_name text not null,
+                                metadata_location text not null,
+                                previous_metadata_location text,
+                                primary key (catalog_name, table_namespace, 
table_name)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists 
iceberg_namespace_properties (
+                                catalog_name text not null,
+                                namespace text not null,
+                                property_key text,
+                                property_value text,
+                                primary key (catalog_name, namespace, 
property_key)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        Ok(SqlCatalog {
+            name: name.to_owned(),
+            connection: Arc::new(Mutex::new(connection)),
+            storage,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            Box::pin(async move {
+            sqlx::query(&format!("select distinct table_namespace from 
iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await
+        })}).await.map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            let namespace = namespace.encode_in_url();
+            Box::pin(async move {
+            sqlx::query(&format!("select table_namespace, table_name, 
metadata_location, previous_metadata_location from iceberg_tables where 
catalog_name = '{}' and table_namespace = '{}';",&name, 
&namespace)).fetch_all(&mut **txn).await
+        })}).await.map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(query_map);
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    let namespace = NamespaceIdent::from_vec(
+                        y.table_namespace
+                            .split('.')
+                            .map(ToString::to_string)
+                            .collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
+                    Ok(TableIdent::new(namespace, y.table_name))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn stat_table(&self, identifier: &TableIdent) -> Result<bool> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let catalog_name = self.name.clone();
+            let namespace = identifier.namespace().encode_in_url();

Review Comment:
   This is incorrect, namespace in encoded in url is different encoded in 
normal string. In url each component is concatenated by '%20', while in normal 
string it's concatenated by '.'.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt};
+use sqlx::{
+    any::{install_default_drivers, AnyConnectOptions, AnyRow},
+    AnyConnection, ConnectOptions, Connection, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO, spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, 
Namespace,
+    NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
+};
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: Arc<Mutex<AnyConnection>>,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(url: &str, name: &str, storage: FileIO) -> Result<Self> {
+        install_default_drivers();
+
+        let mut connection = AnyConnectOptions::connect(
+            
&AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?,
+        )
+        .await
+        .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists iceberg_tables (
+                                catalog_name text not null,
+                                table_namespace text not null,
+                                table_name text not null,
+                                metadata_location text not null,
+                                previous_metadata_location text,
+                                primary key (catalog_name, table_namespace, 
table_name)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists 
iceberg_namespace_properties (
+                                catalog_name text not null,
+                                namespace text not null,
+                                property_key text,
+                                property_value text,
+                                primary key (catalog_name, namespace, 
property_key)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        Ok(SqlCatalog {
+            name: name.to_owned(),
+            connection: Arc::new(Mutex::new(connection)),
+            storage,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            Box::pin(async move {
+            sqlx::query(&format!("select distinct table_namespace from 
iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await
+        })}).await.map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            let namespace = namespace.encode_in_url();
+            Box::pin(async move {
+            sqlx::query(&format!("select table_namespace, table_name, 
metadata_location, previous_metadata_location from iceberg_tables where 
catalog_name = '{}' and table_namespace = '{}';",&name, 
&namespace)).fetch_all(&mut **txn).await
+        })}).await.map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(query_map);
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    let namespace = NamespaceIdent::from_vec(
+                        y.table_namespace
+                            .split('.')
+                            .map(ToString::to_string)
+                            .collect::<Vec<_>>(),

Review Comment:
   How about extract this to a common method in `NamespaceIdent`?



##########
crates/catalog/sql/Cargo.toml:
##########
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+[package]
+name = "iceberg-catalog-sql"
+version = { workspace = true }
+edition = { workspace = true }
+homepage = { workspace = true }
+rust-version = { workspace = true }
+
+categories = ["database"]
+description = "Apache Iceberg Rust Sql Catalog"
+repository = { workspace = true }
+license = { workspace = true }
+keywords = ["iceberg", "sql", "catalog"]
+
+[dependencies]
+anyhow = "1"
+async-trait = { workspace = true }
+chrono = { workspace = true }
+dashmap = "5.5.3"
+futures = { workspace = true }
+iceberg = { workspace = true }
+log = "0.4.20"
+opendal = { workspace = true }
+serde = { workspace = true }
+serde_derive = { workspace = true }
+serde_json = { workspace = true }
+sqlx = { version = "0.7.2", features = ["runtime-tokio-rustls", "any", 
"sqlite", "postgres", "mysql"], default-features = false }

Review Comment:
   +1, we should let it be runtime agnostic.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt};
+use sqlx::{
+    any::{install_default_drivers, AnyConnectOptions, AnyRow},
+    AnyConnection, ConnectOptions, Connection, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO, spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, 
Namespace,
+    NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
+};
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: Arc<Mutex<AnyConnection>>,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(url: &str, name: &str, storage: FileIO) -> Result<Self> {
+        install_default_drivers();
+
+        let mut connection = AnyConnectOptions::connect(
+            
&AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?,
+        )
+        .await
+        .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists iceberg_tables (
+                                catalog_name text not null,
+                                table_namespace text not null,
+                                table_name text not null,
+                                metadata_location text not null,
+                                previous_metadata_location text,
+                                primary key (catalog_name, table_namespace, 
table_name)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists 
iceberg_namespace_properties (
+                                catalog_name text not null,
+                                namespace text not null,
+                                property_key text,
+                                property_value text,
+                                primary key (catalog_name, namespace, 
property_key)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        Ok(SqlCatalog {
+            name: name.to_owned(),
+            connection: Arc::new(Mutex::new(connection)),
+            storage,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            Box::pin(async move {
+            sqlx::query(&format!("select distinct table_namespace from 
iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await

Review Comment:
   Good point, it's better to use prepare statement here.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt};
+use sqlx::{
+    any::{install_default_drivers, AnyConnectOptions, AnyRow},
+    AnyConnection, ConnectOptions, Connection, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO, spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, 
Namespace,
+    NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
+};
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: Arc<Mutex<AnyConnection>>,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(url: &str, name: &str, storage: FileIO) -> Result<Self> {
+        install_default_drivers();
+
+        let mut connection = AnyConnectOptions::connect(
+            
&AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?,
+        )
+        .await
+        .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists iceberg_tables (
+                                catalog_name text not null,
+                                table_namespace text not null,
+                                table_name text not null,
+                                metadata_location text not null,
+                                previous_metadata_location text,
+                                primary key (catalog_name, table_namespace, 
table_name)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists 
iceberg_namespace_properties (
+                                catalog_name text not null,
+                                namespace text not null,
+                                property_key text,
+                                property_value text,
+                                primary key (catalog_name, namespace, 
property_key)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        Ok(SqlCatalog {
+            name: name.to_owned(),
+            connection: Arc::new(Mutex::new(connection)),
+            storage,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            Box::pin(async move {
+            sqlx::query(&format!("select distinct table_namespace from 
iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await
+        })}).await.map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(|row| row.try_get::<String, _>(0));
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    NamespaceIdent::from_vec(
+                        
y.split('.').map(ToString::to_string).collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn create_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        todo!()
+    }
+
+    async fn get_namespace(&self, _namespace: &NamespaceIdent) -> 
Result<Namespace> {
+        todo!()
+    }
+
+    async fn namespace_exists(&self, _namespace: &NamespaceIdent) -> 
Result<bool> {
+        todo!()
+    }
+
+    async fn update_namespace(
+        &self,
+        _namespace: &NamespaceIdent,
+        _properties: HashMap<String, String>,
+    ) -> Result<()> {
+        todo!()
+    }
+
+    async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn list_tables(&self, namespace: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            let namespace = namespace.encode_in_url();
+            Box::pin(async move {
+            sqlx::query(&format!("select table_namespace, table_name, 
metadata_location, previous_metadata_location from iceberg_tables where 
catalog_name = '{}' and table_namespace = '{}';",&name, 
&namespace)).fetch_all(&mut **txn).await
+        })}).await.map_err(from_sqlx_error)?;
+        let iter = rows.iter().map(query_map);
+
+        Ok(iter
+            .map(|x| {
+                x.and_then(|y| {
+                    let namespace = NamespaceIdent::from_vec(
+                        y.table_namespace
+                            .split('.')
+                            .map(ToString::to_string)
+                            .collect::<Vec<_>>(),
+                    )
+                    .map_err(|err| sqlx::Error::Decode(Box::new(err)))?;
+                    Ok(TableIdent::new(namespace, y.table_name))
+                })
+            })
+            .collect::<std::result::Result<_, sqlx::Error>>()
+            .map_err(from_sqlx_error)?)
+    }
+
+    async fn stat_table(&self, identifier: &TableIdent) -> Result<bool> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let catalog_name = self.name.clone();
+            let namespace = identifier.namespace().encode_in_url();
+            let name = identifier.name().to_string();
+            Box::pin(async move {
+            sqlx::query(&format!("select table_namespace, table_name, 
metadata_location, previous_metadata_location from iceberg_tables where 
catalog_name = '{}' and table_namespace = '{}' and table_name = 
'{}';",&catalog_name,
+                &namespace,
+                &name)).fetch_all(&mut **txn).await
+        })}).await.map_err(from_sqlx_error)?;
+        let mut iter = rows.iter().map(query_map);
+
+        Ok(iter.next().is_some())
+    }
+
+    async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> {
+        todo!()
+    }
+
+    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
+        let metadata_location = {

Review Comment:
   Also the insertion should not blind, we need to check its version first. My 
suggestion is to remove the cache for now so that things don't get too 
complicated.



##########
crates/catalog/sql/src/catalog.rs:
##########
@@ -0,0 +1,397 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use async_trait::async_trait;
+use dashmap::DashMap;
+use futures::{lock::Mutex, AsyncReadExt, AsyncWriteExt};
+use sqlx::{
+    any::{install_default_drivers, AnyConnectOptions, AnyRow},
+    AnyConnection, ConnectOptions, Connection, Row,
+};
+use std::collections::HashMap;
+
+use iceberg::{
+    io::FileIO, spec::TableMetadata, table::Table, Catalog, Error, ErrorKind, 
Namespace,
+    NamespaceIdent, Result, TableCommit, TableCreation, TableIdent,
+};
+use uuid::Uuid;
+
+use crate::error::from_sqlx_error;
+
+#[derive(Debug)]
+/// Sql catalog implementation.
+pub struct SqlCatalog {
+    name: String,
+    connection: Arc<Mutex<AnyConnection>>,
+    storage: FileIO,
+    cache: Arc<DashMap<TableIdent, (String, TableMetadata)>>,
+}
+
+impl SqlCatalog {
+    /// Create new sql catalog instance
+    pub async fn new(url: &str, name: &str, storage: FileIO) -> Result<Self> {
+        install_default_drivers();
+
+        let mut connection = AnyConnectOptions::connect(
+            
&AnyConnectOptions::from_url(&url.try_into()?).map_err(from_sqlx_error)?,
+        )
+        .await
+        .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists iceberg_tables (
+                                catalog_name text not null,
+                                table_namespace text not null,
+                                table_name text not null,
+                                metadata_location text not null,
+                                previous_metadata_location text,
+                                primary key (catalog_name, table_namespace, 
table_name)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        connection
+            .transaction(|txn| {
+                Box::pin(async move {
+                    sqlx::query(
+                        "create table if not exists 
iceberg_namespace_properties (
+                                catalog_name text not null,
+                                namespace text not null,
+                                property_key text,
+                                property_value text,
+                                primary key (catalog_name, namespace, 
property_key)
+                            );",
+                    )
+                    .execute(&mut **txn)
+                    .await
+                })
+            })
+            .await
+            .map_err(from_sqlx_error)?;
+
+        Ok(SqlCatalog {
+            name: name.to_owned(),
+            connection: Arc::new(Mutex::new(connection)),
+            storage,
+            cache: Arc::new(DashMap::new()),
+        })
+    }
+}
+
+#[derive(Debug)]
+struct TableRef {
+    table_namespace: String,
+    table_name: String,
+    metadata_location: String,
+    _previous_metadata_location: Option<String>,
+}
+
+fn query_map(row: &AnyRow) -> std::result::Result<TableRef, sqlx::Error> {
+    Ok(TableRef {
+        table_namespace: row.try_get(0)?,
+        table_name: row.try_get(1)?,
+        metadata_location: row.try_get(2)?,
+        _previous_metadata_location: row.try_get::<String, 
_>(3).map(Some).or_else(|err| {
+            if let sqlx::Error::ColumnDecode {
+                index: _,
+                source: _,
+            } = err
+            {
+                Ok(None)
+            } else {
+                Err(err)
+            }
+        })?,
+    })
+}
+
+#[async_trait]
+impl Catalog for SqlCatalog {
+    async fn list_namespaces(
+        &self,
+        _parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let mut connection = self.connection.lock().await;
+        let rows = connection.transaction(|txn|{
+            let name = self.name.clone();
+            Box::pin(async move {
+            sqlx::query(&format!("select distinct table_namespace from 
iceberg_tables where catalog_name = '{}';",&name)).fetch_all(&mut **txn).await

Review Comment:
   There are several points with this implementation:
   1. If `parent` is `None`, we should list all namespaces.
   2. We should also count namespaces in `iceberg_namespace_properties`
   3. We should list only sub namespaces.
   
   See [java 
implementation](https://github.com/apache/iceberg/blob/c07fa0f9c208ecb2e0824b63ebd1ffc9e260f2bb/core/src/main/java/org/apache/iceberg/jdbc/JdbcCatalog.java#L332)
 here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to