fqaiser94 commented on code in PR #475:
URL: https://github.com/apache/iceberg-rust/pull/475#discussion_r1693080874


##########
crates/catalog/inmemory/src/catalog.rs:
##########
@@ -0,0 +1,1466 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains in-memory catalog implementation.
+
+use futures::lock::Mutex;
+use iceberg::io::FileIO;
+use iceberg::spec::{TableMetadata, TableMetadataBuilder};
+use itertools::Itertools;
+use std::collections::HashMap;
+use uuid::Uuid;
+
+use async_trait::async_trait;
+
+use iceberg::table::Table;
+use iceberg::Result;
+use iceberg::{
+    Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, 
TableCreation, TableIdent,
+};
+
+use crate::namespace_state::NamespaceState;
+
+/// In-memory catalog implementation.
+#[derive(Debug)]
+pub struct InMemoryCatalog {
+    root_namespace_state: Mutex<NamespaceState>,
+    file_io: FileIO,
+}
+
+impl InMemoryCatalog {
+    /// Creates an in-memory catalog.
+    pub fn new(file_io: FileIO) -> Self {
+        Self {
+            root_namespace_state: Mutex::new(NamespaceState::new()),
+            file_io,
+        }
+    }
+}
+
+#[async_trait]
+impl Catalog for InMemoryCatalog {
+    /// List namespaces inside the Catalog.
+    async fn list_namespaces(
+        &self,
+        maybe_parent: Option<&NamespaceIdent>,
+    ) -> Result<Vec<NamespaceIdent>> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        match maybe_parent {
+            None => {
+                let namespaces = root_namespace_state
+                    .list_top_level_namespaces()
+                    .into_iter()
+                    .map(|str| NamespaceIdent::new(str.to_string()))
+                    .collect_vec();
+
+                Ok(namespaces)
+            }
+            Some(parent_namespace_ident) => {
+                let namespaces = root_namespace_state
+                    .list_namespaces_under(parent_namespace_ident)?
+                    .into_iter()
+                    .map(|name| NamespaceIdent::new(name.to_string()))
+                    .collect_vec();
+
+                Ok(namespaces)
+            }
+        }
+    }
+
+    /// Create a new namespace inside the catalog.
+    async fn create_namespace(
+        &self,
+        namespace_ident: &NamespaceIdent,
+        properties: HashMap<String, String>,
+    ) -> Result<Namespace> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.insert_new_namespace(namespace_ident, 
properties.clone())?;
+        let namespace = Namespace::with_properties(namespace_ident.clone(), 
properties);
+
+        Ok(namespace)
+    }
+
+    /// Get a namespace information from the catalog.
+    async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> 
Result<Namespace> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        let namespace = Namespace::with_properties(
+            namespace_ident.clone(),
+            root_namespace_state
+                .get_properties(namespace_ident)?
+                .clone(),
+        );
+
+        Ok(namespace)
+    }
+
+    /// Check if namespace exists in catalog.
+    async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> 
Result<bool> {
+        let guarded_namespaces = self.root_namespace_state.lock().await;
+
+        Ok(guarded_namespaces.namespace_exists(namespace_ident))
+    }
+
+    /// Update a namespace inside the catalog.
+    ///
+    /// # Behavior
+    ///
+    /// The properties must be the full set of namespace.
+    async fn update_namespace(
+        &self,
+        namespace_ident: &NamespaceIdent,
+        properties: HashMap<String, String>,
+    ) -> Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.replace_properties(namespace_ident, properties)
+    }
+
+    /// Drop a namespace from the catalog.
+    async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> 
Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.remove_existing_namespace(namespace_ident)
+    }
+
+    /// List tables from namespace.
+    async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> 
Result<Vec<TableIdent>> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        let table_names = root_namespace_state.list_tables(namespace_ident)?;
+        let table_idents = table_names
+            .into_iter()
+            .map(|table_name| TableIdent::new(namespace_ident.clone(), 
table_name.clone()))
+            .collect_vec();
+
+        Ok(table_idents)
+    }
+
+    /// Create a new table inside the namespace.
+    async fn create_table(
+        &self,
+        namespace_ident: &NamespaceIdent,
+        table_creation: TableCreation,
+    ) -> Result<Table> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        let table_name = table_creation.name.clone();
+        let table_ident = TableIdent::new(namespace_ident.clone(), table_name);
+
+        let (table_creation, location) = match table_creation.location.clone() 
{
+            Some(location) => (table_creation, location),
+            None => {
+                let location = format!(
+                    "{}/{}",
+                    table_ident.namespace().join("/"),
+                    table_ident.name()
+                );
+
+                let new_table_creation = TableCreation {
+                    location: Some(location.clone()),
+                    ..table_creation
+                };
+
+                (new_table_creation, location)
+            }
+        };
+
+        let metadata = 
TableMetadataBuilder::from_table_creation(table_creation)?.build()?;
+        let metadata_location = format!(
+            "{}/metadata/{}-{}.metadata.json",
+            &location,
+            0,
+            Uuid::new_v4()
+        );
+
+        self.file_io
+            .new_output(&metadata_location)?
+            .write(serde_json::to_vec(&metadata)?.into())
+            .await?;
+
+        root_namespace_state.insert_new_table(&table_ident, 
metadata_location.clone())?;
+
+        let table = Table::builder()
+            .file_io(self.file_io.clone())
+            .metadata_location(metadata_location)
+            .metadata(metadata)
+            .identifier(table_ident)
+            .build();
+
+        Ok(table)
+    }
+
+    /// Load table from the catalog.
+    async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        let metadata_location = 
root_namespace_state.get_existing_table_location(table_ident)?;
+        let input_file = self.file_io.new_input(metadata_location)?;
+        let metadata_content = input_file.read().await?;
+        let metadata = 
serde_json::from_slice::<TableMetadata>(&metadata_content)?;
+        let table = Table::builder()
+            .file_io(self.file_io.clone())
+            .metadata_location(metadata_location.clone())
+            .metadata(metadata)
+            .identifier(table_ident.clone())
+            .build();
+
+        Ok(table)
+    }
+
+    /// Drop a table from the catalog.
+    async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.remove_existing_table(table_ident)
+    }
+
+    /// Check if a table exists in the catalog.
+    async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> {
+        let root_namespace_state = self.root_namespace_state.lock().await;
+
+        root_namespace_state.table_exists(table_ident)
+    }
+
+    /// Rename a table in the catalog.
+    async fn rename_table(
+        &self,
+        src_table_ident: &TableIdent,
+        dst_table_ident: &TableIdent,
+    ) -> Result<()> {
+        let mut root_namespace_state = self.root_namespace_state.lock().await;
+
+        let mut new_root_namespace_state = root_namespace_state.clone();

Review Comment:
   Great question! 
   
   The cloning is deliberate. The `rename_table` method requires us to perform 
2 separate (i.e. non-atomic) modifications to the catalog state: 
   1. Remove the src table
   2. Add the dst table
   
   Either of these modifications could fail but I'm particularly concerned 
about the case where the second modification fails. 
   If we were to use a modify in-place approach, and the second modification 
fails, we would leave the catalog in a bad state because we would have removed 
the src table already. 
   To avoid this scenario, I'm using a "immutable-atomic-swap" style approach 
that modifies a copy of the catalog state first, and then swaps the pointer to 
this new copy if everything works successfully.
   
   Let me know if that makes sense. 
   
   There are other ways to do this but I found this approach to be the 
"cleanest."



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to