liurenjie1024 commented on code in PR #475: URL: https://github.com/apache/iceberg-rust/pull/475#discussion_r1689983750
########## crates/catalog/inmemory/src/lib.rs: ########## @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Iceberg in-memory Catalog API implementation. + +#![deny(missing_docs)] +#![feature(map_try_insert)] Review Comment: We use stable rust to build this library so that downstream users will not be forced to use unstable rust. ########## crates/catalog/inmemory/src/namespace_state.rs: ########## @@ -0,0 +1,306 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use iceberg::{Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use itertools::Itertools; +use std::collections::{hash_map, HashMap}; + +// Represents the state of a namespace +#[derive(Debug, Clone, Default)] +pub(crate) struct NamespaceState { + // Properties of this namespace + properties: HashMap<String, String>, + // Namespaces nested inside this namespace + namespaces: HashMap<String, NamespaceState>, + // Mapping of tables to metadata locations in this namespace + table_metadata_locations: HashMap<String, String>, +} + +fn no_such_namespace_err<T>(namespace_ident: &NamespaceIdent) -> Result<T> { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such namespace: {:?}", namespace_ident), + )) +} + +fn no_such_table_err<T>(table_ident: &TableIdent) -> Result<T> { + Err(Error::new( + ErrorKind::Unexpected, + format!("No such table: {:?}", table_ident), + )) +} + +fn namespace_already_exists_err<T>(namespace_ident: &NamespaceIdent) -> Result<T> { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create namespace {:?}. Namespace already exists.", + namespace_ident + ), + )) +} + +fn table_already_exists_err<T>(table_ident: &TableIdent) -> Result<T> { + Err(Error::new( + ErrorKind::Unexpected, + format!( + "Cannot create table {:?}. Table already exists.", + table_ident + ), + )) +} + +impl NamespaceState { + // Creates a new namespace state + pub(crate) fn new() -> Self { Review Comment: nit: We can implement `Default` for it. ########## crates/catalog/inmemory/src/catalog.rs: ########## @@ -0,0 +1,1466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains in-memory catalog implementation. + +use futures::lock::Mutex; +use iceberg::io::FileIO; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; +use itertools::Itertools; +use std::collections::HashMap; +use uuid::Uuid; + +use async_trait::async_trait; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use crate::namespace_state::NamespaceState; + +/// In-memory catalog implementation. +#[derive(Debug)] +pub struct InMemoryCatalog { + root_namespace_state: Mutex<NamespaceState>, + file_io: FileIO, +} + +impl InMemoryCatalog { + /// Creates an in-memory catalog. + pub fn new(file_io: FileIO) -> Self { + Self { + root_namespace_state: Mutex::new(NamespaceState::new()), + file_io, + } + } +} + +#[async_trait] +impl Catalog for InMemoryCatalog { + /// List namespaces inside the Catalog. Review Comment: ```suggestion /// List namespaces inside the catalog. ``` ########## crates/iceberg/src/catalog/mod.rs: ########## @@ -35,7 +35,7 @@ use uuid::Uuid; /// The catalog API for Iceberg Rust. #[async_trait] pub trait Catalog: Debug + Sync + Send { - /// List namespaces from table. + /// List namespaces inside the Catalog. Review Comment: ```suggestion /// List namespaces inside the catalog. ``` ########## crates/catalog/inmemory/src/catalog.rs: ########## @@ -0,0 +1,1466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains in-memory catalog implementation. + +use futures::lock::Mutex; +use iceberg::io::FileIO; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; +use itertools::Itertools; +use std::collections::HashMap; +use uuid::Uuid; + +use async_trait::async_trait; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use crate::namespace_state::NamespaceState; + +/// In-memory catalog implementation. +#[derive(Debug)] +pub struct InMemoryCatalog { + root_namespace_state: Mutex<NamespaceState>, + file_io: FileIO, +} + +impl InMemoryCatalog { + /// Creates an in-memory catalog. + pub fn new(file_io: FileIO) -> Self { + Self { + root_namespace_state: Mutex::new(NamespaceState::new()), + file_io, + } + } +} + +#[async_trait] +impl Catalog for InMemoryCatalog { + /// List namespaces inside the Catalog. + async fn list_namespaces( + &self, + maybe_parent: Option<&NamespaceIdent>, + ) -> Result<Vec<NamespaceIdent>> { + let root_namespace_state = self.root_namespace_state.lock().await; + + match maybe_parent { + None => { + let namespaces = root_namespace_state + .list_top_level_namespaces() + .into_iter() + .map(|str| NamespaceIdent::new(str.to_string())) + .collect_vec(); + + Ok(namespaces) + } + Some(parent_namespace_ident) => { + let namespaces = root_namespace_state + .list_namespaces_under(parent_namespace_ident)? + .into_iter() + .map(|name| NamespaceIdent::new(name.to_string())) + .collect_vec(); + + Ok(namespaces) + } + } + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<Namespace> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?; + let namespace = Namespace::with_properties(namespace_ident.clone(), properties); + + Ok(namespace) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> { + let root_namespace_state = self.root_namespace_state.lock().await; + + let namespace = Namespace::with_properties( + namespace_ident.clone(), + root_namespace_state + .get_properties(namespace_ident)? + .clone(), + ); + + Ok(namespace) + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> { + let guarded_namespaces = self.root_namespace_state.lock().await; + + Ok(guarded_namespaces.namespace_exists(namespace_ident)) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.replace_properties(namespace_ident, properties) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.remove_existing_namespace(namespace_ident) + } + + /// List tables from namespace. + async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> { + let root_namespace_state = self.root_namespace_state.lock().await; + + let table_names = root_namespace_state.list_tables(namespace_ident)?; + let table_idents = table_names + .into_iter() + .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone())) + .collect_vec(); + + Ok(table_idents) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + namespace_ident: &NamespaceIdent, + table_creation: TableCreation, + ) -> Result<Table> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let table_name = table_creation.name.clone(); + let table_ident = TableIdent::new(namespace_ident.clone(), table_name); + + let (table_creation, location) = match table_creation.location.clone() { + Some(location) => (table_creation, location), + None => { + let location = format!( Review Comment: I think as other catalogs, we should allow user to config a path as root of warehouse, and we create tables under this root path? ########## crates/catalog/inmemory/Cargo.toml: ########## @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +name = "iceberg-catalog-inmemory" Review Comment: +1 ########## crates/catalog/inmemory/src/catalog.rs: ########## @@ -0,0 +1,1466 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains in-memory catalog implementation. + +use futures::lock::Mutex; +use iceberg::io::FileIO; +use iceberg::spec::{TableMetadata, TableMetadataBuilder}; +use itertools::Itertools; +use std::collections::HashMap; +use uuid::Uuid; + +use async_trait::async_trait; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use crate::namespace_state::NamespaceState; + +/// In-memory catalog implementation. +#[derive(Debug)] +pub struct InMemoryCatalog { + root_namespace_state: Mutex<NamespaceState>, + file_io: FileIO, +} + +impl InMemoryCatalog { + /// Creates an in-memory catalog. + pub fn new(file_io: FileIO) -> Self { + Self { + root_namespace_state: Mutex::new(NamespaceState::new()), + file_io, + } + } +} + +#[async_trait] +impl Catalog for InMemoryCatalog { + /// List namespaces inside the Catalog. + async fn list_namespaces( + &self, + maybe_parent: Option<&NamespaceIdent>, + ) -> Result<Vec<NamespaceIdent>> { + let root_namespace_state = self.root_namespace_state.lock().await; + + match maybe_parent { + None => { + let namespaces = root_namespace_state + .list_top_level_namespaces() + .into_iter() + .map(|str| NamespaceIdent::new(str.to_string())) + .collect_vec(); + + Ok(namespaces) + } + Some(parent_namespace_ident) => { + let namespaces = root_namespace_state + .list_namespaces_under(parent_namespace_ident)? + .into_iter() + .map(|name| NamespaceIdent::new(name.to_string())) + .collect_vec(); + + Ok(namespaces) + } + } + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<Namespace> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.insert_new_namespace(namespace_ident, properties.clone())?; + let namespace = Namespace::with_properties(namespace_ident.clone(), properties); + + Ok(namespace) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<Namespace> { + let root_namespace_state = self.root_namespace_state.lock().await; + + let namespace = Namespace::with_properties( + namespace_ident.clone(), + root_namespace_state + .get_properties(namespace_ident)? + .clone(), + ); + + Ok(namespace) + } + + /// Check if namespace exists in catalog. + async fn namespace_exists(&self, namespace_ident: &NamespaceIdent) -> Result<bool> { + let guarded_namespaces = self.root_namespace_state.lock().await; + + Ok(guarded_namespaces.namespace_exists(namespace_ident)) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + namespace_ident: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.replace_properties(namespace_ident, properties) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace_ident: &NamespaceIdent) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.remove_existing_namespace(namespace_ident) + } + + /// List tables from namespace. + async fn list_tables(&self, namespace_ident: &NamespaceIdent) -> Result<Vec<TableIdent>> { + let root_namespace_state = self.root_namespace_state.lock().await; + + let table_names = root_namespace_state.list_tables(namespace_ident)?; + let table_idents = table_names + .into_iter() + .map(|table_name| TableIdent::new(namespace_ident.clone(), table_name.clone())) + .collect_vec(); + + Ok(table_idents) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + namespace_ident: &NamespaceIdent, + table_creation: TableCreation, + ) -> Result<Table> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let table_name = table_creation.name.clone(); + let table_ident = TableIdent::new(namespace_ident.clone(), table_name); + + let (table_creation, location) = match table_creation.location.clone() { + Some(location) => (table_creation, location), + None => { + let location = format!( + "{}/{}", + table_ident.namespace().join("/"), + table_ident.name() + ); + + let new_table_creation = TableCreation { + location: Some(location.clone()), + ..table_creation + }; + + (new_table_creation, location) + } + }; + + let metadata = TableMetadataBuilder::from_table_creation(table_creation)?.build()?; + let metadata_location = format!( + "{}/metadata/{}-{}.metadata.json", + &location, + 0, + Uuid::new_v4() + ); + + self.file_io + .new_output(&metadata_location)? + .write(serde_json::to_vec(&metadata)?.into()) + .await?; + + root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?; + + let table = Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location) + .metadata(metadata) + .identifier(table_ident) + .build(); + + Ok(table) + } + + /// Load table from the catalog. + async fn load_table(&self, table_ident: &TableIdent) -> Result<Table> { + let root_namespace_state = self.root_namespace_state.lock().await; + + let metadata_location = root_namespace_state.get_existing_table_location(table_ident)?; + let input_file = self.file_io.new_input(metadata_location)?; + let metadata_content = input_file.read().await?; + let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?; + let table = Table::builder() + .file_io(self.file_io.clone()) + .metadata_location(metadata_location.clone()) + .metadata(metadata) + .identifier(table_ident.clone()) + .build(); + + Ok(table) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, table_ident: &TableIdent) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.remove_existing_table(table_ident) + } + + /// Check if a table exists in the catalog. + async fn table_exists(&self, table_ident: &TableIdent) -> Result<bool> { + let root_namespace_state = self.root_namespace_state.lock().await; + + root_namespace_state.table_exists(table_ident) + } + + /// Rename a table in the catalog. + async fn rename_table( + &self, + src_table_ident: &TableIdent, + dst_table_ident: &TableIdent, + ) -> Result<()> { + let mut root_namespace_state = self.root_namespace_state.lock().await; + + let mut new_root_namespace_state = root_namespace_state.clone(); Review Comment: Why we need to clone this? Why not just modify it in place? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org