Fokko commented on code in PR #78: URL: https://github.com/apache/iceberg-rust/pull/78#discussion_r1363673663
########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; Review Comment: I think you've copied this from the PyIceberg repository. I think we need to bump this. Along the way, quite a few fixes have been made to the spec. ########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; +const PATH_V1: &str = "v1"; + +/// Rest catalog configuration. +#[derive(Debug, TypedBuilder)] +pub struct RestCatalogConfig { + uri: String, + #[builder(default, setter(strip_option))] + warehouse: Option<String>, + + #[builder(default)] + props: HashMap<String, String>, +} + +impl RestCatalogConfig { + fn config_endpoint(&self) -> String { + [&self.uri, PATH_V1, "config"].join("/") + } + + fn namespaces_endpoint(&self) -> String { + [&self.uri, PATH_V1, "namespaces"].join("/") + } + + fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String { + [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/") + } + + fn tables_endpoint(&self, ns: &NamespaceIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &ns.encode_in_url(), + "tables", + ] + .join("/") + } + + fn rename_table_endpoint(&self) -> String { + [&self.uri, PATH_V1, "tables", "rename"].join("/") + } + + fn table_endpoint(&self, table: &TableIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &table.namespace.encode_in_url(), + "tables", + encode(&table.name).as_ref(), + ] + .join("/") + } + + fn try_create_rest_client(&self) -> Result<HttpClient> { + //TODO: We will add oauth, ssl config, sigv4 later + let mut headers = HeaderMap::new(); Review Comment: I'm learning from @Xuanwo: https://github.com/apache/iceberg-rust/pull/76/files#r1363173256 It looks like we can use `from_iter` on the `HeaderMap` as well: https://docs.rs/http/latest/http/header/struct.HeaderMap.html ########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; +const PATH_V1: &str = "v1"; + +/// Rest catalog configuration. +#[derive(Debug, TypedBuilder)] +pub struct RestCatalogConfig { + uri: String, + #[builder(default, setter(strip_option))] + warehouse: Option<String>, + + #[builder(default)] + props: HashMap<String, String>, +} + +impl RestCatalogConfig { + fn config_endpoint(&self) -> String { + [&self.uri, PATH_V1, "config"].join("/") + } + + fn namespaces_endpoint(&self) -> String { + [&self.uri, PATH_V1, "namespaces"].join("/") + } + + fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String { + [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/") + } + + fn tables_endpoint(&self, ns: &NamespaceIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &ns.encode_in_url(), + "tables", + ] + .join("/") + } + + fn rename_table_endpoint(&self) -> String { + [&self.uri, PATH_V1, "tables", "rename"].join("/") + } + + fn table_endpoint(&self, table: &TableIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &table.namespace.encode_in_url(), + "tables", + encode(&table.name).as_ref(), + ] + .join("/") + } + + fn try_create_rest_client(&self) -> Result<HttpClient> { + //TODO: We will add oauth, ssl config, sigv4 later + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + headers.insert( + HeaderName::from_static("x-client-version"), + HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION), + ); + headers.insert( + header::USER_AGENT, + HeaderValue::from_str(&format!("iceberg-rs/{}", env!("CARGO_PKG_VERSION"))).unwrap(), + ); + + Ok(HttpClient( + Client::builder().default_headers(headers).build()?, + )) + } +} + +struct HttpClient(Client); + +impl HttpClient { + async fn query< + R: DeserializeOwned, + E: DeserializeOwned + Into<Error>, + const SUCCESS_CODE: u16, + >( + &self, + request: Request, + ) -> Result<R> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + let text = resp.bytes().await?; + Ok(serde_json::from_slice::<R>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } + + async fn execute<E: DeserializeOwned + Into<Error>, const SUCCESS_CODE: u16>( + &self, + request: Request, + ) -> Result<()> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + Ok(()) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } +} + +/// Rest catalog implementation. +pub struct RestCatalog { + config: RestCatalogConfig, + client: HttpClient, +} + +#[async_trait] +impl Catalog for RestCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + parent: Option<&NamespaceIdent>, + ) -> Result<Vec<NamespaceIdent>> { + let mut request = self.client.0.get(self.config.namespaces_endpoint()); + if let Some(ns) = parent { + request = request.query(&[("parent", ns.encode_in_url())]); + } + + let resp = self + .client + .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?) + .await?; + + resp.namespaces + .into_iter() + .map(NamespaceIdent::from_vec) + .collect::<Result<Vec<NamespaceIdent>>>() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<Namespace> { + let request = self + .client + .0 + .post(self.config.namespaces_endpoint()) + .json(&NamespaceSerde { + namespace: namespace.as_ref().clone(), + properties: Some(properties), + }) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + + Namespace::try_from(resp) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> { + let request = self + .client + .0 + .get(self.config.namespace_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + Namespace::try_from(resp) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap<String, String>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating namespace not supported yet!", + )) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.namespace_endpoint(namespace)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// List tables from namespace. + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> { + let request = self + .client + .0 + .get(self.config.tables_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<ListTableResponse, ErrorModel, OK>(request) + .await?; + + Ok(resp.identifiers) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Load table from the catalog. + async fn load_table(&self, _table: &TableIdent) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.table_endpoint(table)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Check if a table exists in the catalog. + async fn stat_table(&self, table: &TableIdent) -> Result<bool> { + let request = self + .client + .0 + .head(self.config.table_endpoint(table)) + .build()?; + + self.client + .execute::<ErrorModel, NO_CONTENT>(request) + .await + .map(|_| true) + } + + /// Rename a table in the catalog. + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .post(self.config.rename_table_endpoint()) + .json(&RenameTableRequest { + source: src.clone(), + destination: dest.clone(), + }) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Update a table to the catalog. + async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) -> Result<Table> { + todo!() + } + + /// Update multiple tables to the catalog as an atomic operation. + async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) -> Result<()> { + todo!() + } Review Comment: I'm not sure if this is the way that we want to expose update tables. It is important that the right updates and requirements are set, otherwise, race conditions might occur. Also, there is little validation here, for example, can you do two distinct schema changes (so two updates), in a single commit? I would maybe leave this out for now so we can decide later on. We might want to introduce a similar API as PyIceberg (which is inspired on Java): https://py.iceberg.apache.org/api/#schema-evolution ########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; +const PATH_V1: &str = "v1"; + +/// Rest catalog configuration. +#[derive(Debug, TypedBuilder)] +pub struct RestCatalogConfig { + uri: String, + #[builder(default, setter(strip_option))] + warehouse: Option<String>, + + #[builder(default)] + props: HashMap<String, String>, +} + +impl RestCatalogConfig { + fn config_endpoint(&self) -> String { + [&self.uri, PATH_V1, "config"].join("/") + } + + fn namespaces_endpoint(&self) -> String { + [&self.uri, PATH_V1, "namespaces"].join("/") + } + + fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String { + [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/") + } + + fn tables_endpoint(&self, ns: &NamespaceIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &ns.encode_in_url(), + "tables", + ] + .join("/") + } + + fn rename_table_endpoint(&self) -> String { + [&self.uri, PATH_V1, "tables", "rename"].join("/") + } + + fn table_endpoint(&self, table: &TableIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &table.namespace.encode_in_url(), + "tables", + encode(&table.name).as_ref(), + ] + .join("/") + } + + fn try_create_rest_client(&self) -> Result<HttpClient> { + //TODO: We will add oauth, ssl config, sigv4 later + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + headers.insert( + HeaderName::from_static("x-client-version"), + HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION), + ); + headers.insert( + header::USER_AGENT, + HeaderValue::from_str(&format!("iceberg-rs/{}", env!("CARGO_PKG_VERSION"))).unwrap(), + ); + + Ok(HttpClient( + Client::builder().default_headers(headers).build()?, + )) + } +} + +struct HttpClient(Client); + +impl HttpClient { + async fn query< + R: DeserializeOwned, + E: DeserializeOwned + Into<Error>, + const SUCCESS_CODE: u16, + >( + &self, + request: Request, + ) -> Result<R> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + let text = resp.bytes().await?; + Ok(serde_json::from_slice::<R>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } + + async fn execute<E: DeserializeOwned + Into<Error>, const SUCCESS_CODE: u16>( + &self, + request: Request, + ) -> Result<()> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + Ok(()) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } +} + +/// Rest catalog implementation. +pub struct RestCatalog { + config: RestCatalogConfig, + client: HttpClient, +} + +#[async_trait] +impl Catalog for RestCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + parent: Option<&NamespaceIdent>, + ) -> Result<Vec<NamespaceIdent>> { + let mut request = self.client.0.get(self.config.namespaces_endpoint()); + if let Some(ns) = parent { + request = request.query(&[("parent", ns.encode_in_url())]); + } + + let resp = self + .client + .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?) + .await?; + + resp.namespaces + .into_iter() + .map(NamespaceIdent::from_vec) + .collect::<Result<Vec<NamespaceIdent>>>() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<Namespace> { + let request = self + .client + .0 + .post(self.config.namespaces_endpoint()) + .json(&NamespaceSerde { + namespace: namespace.as_ref().clone(), + properties: Some(properties), + }) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + + Namespace::try_from(resp) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> { + let request = self + .client + .0 + .get(self.config.namespace_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + Namespace::try_from(resp) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap<String, String>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating namespace not supported yet!", + )) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.namespace_endpoint(namespace)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// List tables from namespace. + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> { + let request = self + .client + .0 + .get(self.config.tables_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<ListTableResponse, ErrorModel, OK>(request) + .await?; + + Ok(resp.identifiers) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Load table from the catalog. + async fn load_table(&self, _table: &TableIdent) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.table_endpoint(table)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Check if a table exists in the catalog. + async fn stat_table(&self, table: &TableIdent) -> Result<bool> { + let request = self + .client + .0 + .head(self.config.table_endpoint(table)) + .build()?; + + self.client + .execute::<ErrorModel, NO_CONTENT>(request) + .await + .map(|_| true) + } + + /// Rename a table in the catalog. + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .post(self.config.rename_table_endpoint()) + .json(&RenameTableRequest { + source: src.clone(), + destination: dest.clone(), + }) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Update a table to the catalog. + async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) -> Result<Table> { + todo!() + } + + /// Update multiple tables to the catalog as an atomic operation. + async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) -> Result<()> { + todo!() + } +} + +impl RestCatalog { + /// Creates a rest catalog from config. + pub async fn new(config: RestCatalogConfig) -> Result<Self> { + let mut catalog = Self { + client: config.try_create_rest_client()?, + config, + }; + + catalog.update_config().await?; + catalog.client = catalog.config.try_create_rest_client()?; + + Ok(catalog) + } + + async fn update_config(&mut self) -> Result<()> { + let mut request = self.client.0.get(self.config.config_endpoint()); + + if let Some(warehouse_location) = &self.config.warehouse { + request = request.query(&[("warehouse", warehouse_location)]); + } + let mut config = self + .client + .query::<CatalogConfig, ErrorResponse, OK>(request.build()?) + .await?; + + config.defaults.extend(self.config.props.clone()); + config.defaults.extend(config.overrides); + + self.config.props = config.defaults; + + Ok(()) + } +} + +/// Requests and responses for rest api. +mod _serde { + use std::collections::HashMap; + + use serde_derive::{Deserialize, Serialize}; + + use iceberg::{Error, ErrorKind, Namespace, TableIdent}; + + pub(super) const OK: u16 = 200u16; + pub(super) const NO_CONTENT: u16 = 204u16; + #[derive(Clone, Debug, Serialize, Deserialize)] + pub(super) struct CatalogConfig { + pub(super) overrides: HashMap<String, String>, + pub(super) defaults: HashMap<String, String>, + } + + #[derive(Debug, Serialize, Deserialize)] + pub(super) struct ErrorResponse { + error: ErrorModel, + } + + impl From<ErrorResponse> for Error { + fn from(resp: ErrorResponse) -> Error { + resp.error.into() + } + } + + #[derive(Debug, Serialize, Deserialize)] + pub(super) struct ErrorModel { + pub(super) message: String, + pub(super) r#type: String, + pub(super) code: i32, Review Comment: Nit: We use u16 for the error code in other places. ```suggestion pub(super) code: u16, ``` ########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; Review Comment: The intent of this header is mostly debugging ########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; Review Comment: For example, the NamespaceExists had been added recently: https://github.com/apache/iceberg/pull/8569 ########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; +const PATH_V1: &str = "v1"; + +/// Rest catalog configuration. +#[derive(Debug, TypedBuilder)] +pub struct RestCatalogConfig { + uri: String, + #[builder(default, setter(strip_option))] + warehouse: Option<String>, + + #[builder(default)] + props: HashMap<String, String>, +} + +impl RestCatalogConfig { + fn config_endpoint(&self) -> String { + [&self.uri, PATH_V1, "config"].join("/") + } + + fn namespaces_endpoint(&self) -> String { + [&self.uri, PATH_V1, "namespaces"].join("/") + } + + fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String { + [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/") + } + + fn tables_endpoint(&self, ns: &NamespaceIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &ns.encode_in_url(), + "tables", + ] + .join("/") + } + + fn rename_table_endpoint(&self) -> String { + [&self.uri, PATH_V1, "tables", "rename"].join("/") + } + + fn table_endpoint(&self, table: &TableIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &table.namespace.encode_in_url(), + "tables", + encode(&table.name).as_ref(), + ] + .join("/") + } + + fn try_create_rest_client(&self) -> Result<HttpClient> { + //TODO: We will add oauth, ssl config, sigv4 later + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + headers.insert( + HeaderName::from_static("x-client-version"), + HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION), + ); + headers.insert( + header::USER_AGENT, + HeaderValue::from_str(&format!("iceberg-rs/{}", env!("CARGO_PKG_VERSION"))).unwrap(), + ); + + Ok(HttpClient( + Client::builder().default_headers(headers).build()?, + )) + } +} + +struct HttpClient(Client); + +impl HttpClient { + async fn query< + R: DeserializeOwned, + E: DeserializeOwned + Into<Error>, + const SUCCESS_CODE: u16, + >( + &self, + request: Request, + ) -> Result<R> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + let text = resp.bytes().await?; + Ok(serde_json::from_slice::<R>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } + + async fn execute<E: DeserializeOwned + Into<Error>, const SUCCESS_CODE: u16>( + &self, + request: Request, + ) -> Result<()> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + Ok(()) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } +} + +/// Rest catalog implementation. +pub struct RestCatalog { + config: RestCatalogConfig, + client: HttpClient, +} + +#[async_trait] +impl Catalog for RestCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + parent: Option<&NamespaceIdent>, + ) -> Result<Vec<NamespaceIdent>> { + let mut request = self.client.0.get(self.config.namespaces_endpoint()); + if let Some(ns) = parent { + request = request.query(&[("parent", ns.encode_in_url())]); + } + + let resp = self + .client + .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?) + .await?; + + resp.namespaces + .into_iter() + .map(NamespaceIdent::from_vec) + .collect::<Result<Vec<NamespaceIdent>>>() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<Namespace> { + let request = self + .client + .0 + .post(self.config.namespaces_endpoint()) + .json(&NamespaceSerde { + namespace: namespace.as_ref().clone(), + properties: Some(properties), + }) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + + Namespace::try_from(resp) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> { + let request = self + .client + .0 + .get(self.config.namespace_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + Namespace::try_from(resp) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap<String, String>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating namespace not supported yet!", + )) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.namespace_endpoint(namespace)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// List tables from namespace. + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> { + let request = self + .client + .0 + .get(self.config.tables_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<ListTableResponse, ErrorModel, OK>(request) + .await?; + + Ok(resp.identifiers) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Load table from the catalog. + async fn load_table(&self, _table: &TableIdent) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.table_endpoint(table)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Check if a table exists in the catalog. + async fn stat_table(&self, table: &TableIdent) -> Result<bool> { + let request = self + .client + .0 + .head(self.config.table_endpoint(table)) + .build()?; + + self.client + .execute::<ErrorModel, NO_CONTENT>(request) + .await + .map(|_| true) + } + + /// Rename a table in the catalog. + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .post(self.config.rename_table_endpoint()) + .json(&RenameTableRequest { + source: src.clone(), + destination: dest.clone(), + }) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Update a table to the catalog. + async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) -> Result<Table> { + todo!() + } + + /// Update multiple tables to the catalog as an atomic operation. + async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) -> Result<()> { + todo!() + } +} + +impl RestCatalog { + /// Creates a rest catalog from config. + pub async fn new(config: RestCatalogConfig) -> Result<Self> { + let mut catalog = Self { + client: config.try_create_rest_client()?, + config, + }; + + catalog.update_config().await?; + catalog.client = catalog.config.try_create_rest_client()?; + + Ok(catalog) + } + + async fn update_config(&mut self) -> Result<()> { + let mut request = self.client.0.get(self.config.config_endpoint()); + + if let Some(warehouse_location) = &self.config.warehouse { + request = request.query(&[("warehouse", warehouse_location)]); + } + let mut config = self + .client + .query::<CatalogConfig, ErrorResponse, OK>(request.build()?) + .await?; + + config.defaults.extend(self.config.props.clone()); Review Comment: This is correct, but it looks a bit odd because you overwrite the default (which is fine) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org