liurenjie1024 commented on code in PR #475:
URL: https://github.com/apache/iceberg-rust/pull/475#discussion_r1693113401


##########
crates/catalog/inmemory/src/catalog.rs:
##########
@@ -0,0 +1,1466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains in-memory catalog implementation.
+
+use futures::lock::Mutex;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
+use itertools::Itertools;
+use std::collections::HashMap;
+use uuid::Uuid;
+
+use async_trait::async_trait;
+
+use iceberg::table::Table;
+use iceberg::Result;
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, 
TableCreation, TableIdent,
+};
+
+use crate::namespace_state::NamespaceState;
+
+/// In-memory catalog implementation.
+#[derive(Debug)]
+pub struct InMemoryCatalog {
+    root_namespace_state: Mutex<NamespaceState>,
+    file_io: FileIO,
+}
+
+impl InMemoryCatalog {
+    /// Creates an in-memory catalog.
+    pub fn new(file_io: FileIO) -> Self {
+        Self {
+            root_namespace_state: Mutex::new(NamespaceState::new()),
+            file_io,
+        }
+    }
+}
+
+#[async_trait]
+impl Catalog for InMemoryCatalog {
+    /// List namespaces inside the Catalog.
+    async fn list_namespaces(
+        &self,
+        maybe_parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        match maybe_parent {
+            None => {
+                let namespaces = root_namespace_state
+                    .list_top_level_namespaces()
+                    .into_iter()
+                    .map(|str| NamespaceIdent::new(str.to_string()))
+                    .collect_vec();
+
+                Ok(namespaces)
+            }
+            Some(parent_namespace_ident) => {
+                let namespaces = root_namespace_state
+                    .list_namespaces_under(parent_namespace_ident)?
+                    .into_iter()
+                    .map(|name| NamespaceIdent::new(name.to_string()))
+                    .collect_vec();
+
+                Ok(namespaces)
+            }
+        }
+    }
+
+    /// Create a new namespace inside the catalog.
+    async fn create_namespace(
+        &self,
+        namespace_ident: &NamespaceIdent,
+        properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.insert_new_namespace(namespace_ident, 
properties.clone())?;
+        let namespace = Namespace::with_properties(namespace_ident.clone(), 
properties);
+
+        Ok(namespace)
+    }
+
+    /// Get a namespace information from the catalog.
+    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> 
Result<Namespace> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        let namespace = Namespace::with_properties(
+            namespace_ident.clone(),
+            root_namespace_state
+                .get_properties(namespace_ident)?
+                .clone(),
+        );
+
+        Ok(namespace)
+    }
+
+    /// Check if namespace exists in catalog.
+    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> 
Result<bool> {
+        let guarded_namespaces = self.root_namespace_state.lock().await;
+
+        Ok(guarded_namespaces.namespace_exists(namespace_ident))
+    }
+
+    /// Update a namespace inside the catalog.
+    ///
+    /// # Behavior
+    ///
+    /// The properties must be the full set of namespace.
+    async fn update_namespace(
+        &self,
+        namespace_ident: &NamespaceIdent,
+        properties: HashMap<String, String>,
+    ) -> Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.replace_properties(namespace_ident, properties)
+    }
+
+    /// Drop a namespace from the catalog.
+    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> 
Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.remove_existing_namespace(namespace_ident)
+    }
+
+    /// List tables from namespace.
+    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        let table_names = root_namespace_state.list_tables(namespace_ident)?;
+        let table_idents = table_names
+            .into_iter()
+            .map(|table_name| TableIdent::new(namespace_ident.clone(), 
table_name.clone()))
+            .collect_vec();
+
+        Ok(table_idents)
+    }
+
+    /// Create a new table inside the namespace.
+    async fn create_table(
+        &self,
+        namespace_ident: &NamespaceIdent,
+        table_creation: TableCreation,
+    ) -> Result<Table> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        let table_name = table_creation.name.clone();
+        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
+
+        let (table_creation, location) = match table_creation.location.clone() 
{
+            Some(location) => (table_creation, location),
+            None => {
+                let location = format!(
+                    "{}/{}",
+                    table_ident.namespace().join("/"),
+                    table_ident.name()
+                );
+
+                let new_table_creation = TableCreation {
+                    location: Some(location.clone()),
+                    ..table_creation
+                };
+
+                (new_table_creation, location)
+            }
+        };
+
+        let metadata = 
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+        let metadata_location = format!(
+            "{}/metadata/{}-{}.metadata.json",
+            &location,
+            0,
+            Uuid::new_v4()
+        );
+
+        self.file_io
+            .new_output(&metadata_location)?
+            .write(serde_json::to_vec(&metadata)?.into())
+            .await?;
+
+        root_namespace_state.insert_new_table(&table_ident, 
metadata_location.clone())?;
+
+        let table = Table::builder()
+            .file_io(self.file_io.clone())
+            .metadata_location(metadata_location)
+            .metadata(metadata)
+            .identifier(table_ident)
+            .build();
+
+        Ok(table)
+    }
+
+    /// Load table from the catalog.
+    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        let metadata_location = 
root_namespace_state.get_existing_table_location(table_ident)?;
+        let input_file = self.file_io.new_input(metadata_location)?;
+        let metadata_content = input_file.read().await?;
+        let metadata = 
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
+        let table = Table::builder()
+            .file_io(self.file_io.clone())
+            .metadata_location(metadata_location.clone())
+            .metadata(metadata)
+            .identifier(table_ident.clone())
+            .build();
+
+        Ok(table)
+    }
+
+    /// Drop a table from the catalog.
+    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.remove_existing_table(table_ident)
+    }
+
+    /// Check if a table exists in the catalog.
+    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.table_exists(table_ident)
+    }
+
+    /// Rename a table in the catalog.
+    async fn rename_table(
+        &self,
+        src_table_ident: &TableIdent,
+        dst_table_ident: &TableIdent,
+    ) -> Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        let mut new_root_namespace_state = root_namespace_state.clone();

Review Comment:
   Thanks for this explaination, it makes sense to me! The only concern with me 
is that cloning maybe quite slow for large catalog, but given this memory 
catalog is mostly used for test and demo, I think this is acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to