liurenjie1024 commented on code in PR #78: URL: https://github.com/apache/iceberg-rust/pull/78#discussion_r1365327942
########## crates/catalog/rest/src/catalog.rs: ########## @@ -0,0 +1,845 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This module contains rest catalog implementation. + +use std::collections::HashMap; + +use async_trait::async_trait; +use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; +use reqwest::{Client, Request}; +use serde::de::DeserializeOwned; +use typed_builder::TypedBuilder; +use urlencoding::encode; + +use iceberg::table::Table; +use iceberg::Result; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, TableCommit, TableCreation, TableIdent, +}; + +use self::_serde::{ + CatalogConfig, ErrorModel, ErrorResponse, ListNamespaceResponse, ListTableResponse, + NamespaceSerde, RenameTableRequest, NO_CONTENT, OK, +}; + +const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; +const PATH_V1: &str = "v1"; + +/// Rest catalog configuration. +#[derive(Debug, TypedBuilder)] +pub struct RestCatalogConfig { + uri: String, + #[builder(default, setter(strip_option))] + warehouse: Option<String>, + + #[builder(default)] + props: HashMap<String, String>, +} + +impl RestCatalogConfig { + fn config_endpoint(&self) -> String { + [&self.uri, PATH_V1, "config"].join("/") + } + + fn namespaces_endpoint(&self) -> String { + [&self.uri, PATH_V1, "namespaces"].join("/") + } + + fn namespace_endpoint(&self, ns: &NamespaceIdent) -> String { + [&self.uri, PATH_V1, "namespaces", &ns.encode_in_url()].join("/") + } + + fn tables_endpoint(&self, ns: &NamespaceIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &ns.encode_in_url(), + "tables", + ] + .join("/") + } + + fn rename_table_endpoint(&self) -> String { + [&self.uri, PATH_V1, "tables", "rename"].join("/") + } + + fn table_endpoint(&self, table: &TableIdent) -> String { + [ + &self.uri, + PATH_V1, + "namespaces", + &table.namespace.encode_in_url(), + "tables", + encode(&table.name).as_ref(), + ] + .join("/") + } + + fn try_create_rest_client(&self) -> Result<HttpClient> { + //TODO: We will add oauth, ssl config, sigv4 later + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + headers.insert( + HeaderName::from_static("x-client-version"), + HeaderValue::from_static(ICEBERG_REST_SPEC_VERSION), + ); + headers.insert( + header::USER_AGENT, + HeaderValue::from_str(&format!("iceberg-rs/{}", env!("CARGO_PKG_VERSION"))).unwrap(), + ); + + Ok(HttpClient( + Client::builder().default_headers(headers).build()?, + )) + } +} + +struct HttpClient(Client); + +impl HttpClient { + async fn query< + R: DeserializeOwned, + E: DeserializeOwned + Into<Error>, + const SUCCESS_CODE: u16, + >( + &self, + request: Request, + ) -> Result<R> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + let text = resp.bytes().await?; + Ok(serde_json::from_slice::<R>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } + + async fn execute<E: DeserializeOwned + Into<Error>, const SUCCESS_CODE: u16>( + &self, + request: Request, + ) -> Result<()> { + let resp = self.0.execute(request).await?; + + if resp.status().as_u16() == SUCCESS_CODE { + Ok(()) + } else { + let text = resp.bytes().await?; + let e = serde_json::from_slice::<E>(&text).map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to parse response from rest catalog server!", + ) + .with_context("json", String::from_utf8_lossy(&text)) + .with_source(e) + })?; + Err(e.into()) + } + } +} + +/// Rest catalog implementation. +pub struct RestCatalog { + config: RestCatalogConfig, + client: HttpClient, +} + +#[async_trait] +impl Catalog for RestCatalog { + /// List namespaces from table. + async fn list_namespaces( + &self, + parent: Option<&NamespaceIdent>, + ) -> Result<Vec<NamespaceIdent>> { + let mut request = self.client.0.get(self.config.namespaces_endpoint()); + if let Some(ns) = parent { + request = request.query(&[("parent", ns.encode_in_url())]); + } + + let resp = self + .client + .query::<ListNamespaceResponse, ErrorModel, OK>(request.build()?) + .await?; + + resp.namespaces + .into_iter() + .map(NamespaceIdent::from_vec) + .collect::<Result<Vec<NamespaceIdent>>>() + } + + /// Create a new namespace inside the catalog. + async fn create_namespace( + &self, + namespace: &NamespaceIdent, + properties: HashMap<String, String>, + ) -> Result<Namespace> { + let request = self + .client + .0 + .post(self.config.namespaces_endpoint()) + .json(&NamespaceSerde { + namespace: namespace.as_ref().clone(), + properties: Some(properties), + }) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + + Namespace::try_from(resp) + } + + /// Get a namespace information from the catalog. + async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> { + let request = self + .client + .0 + .get(self.config.namespace_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<NamespaceSerde, ErrorModel, OK>(request) + .await?; + Namespace::try_from(resp) + } + + /// Update a namespace inside the catalog. + /// + /// # Behavior + /// + /// The properties must be the full set of namespace. + async fn update_namespace( + &self, + _namespace: &NamespaceIdent, + _properties: HashMap<String, String>, + ) -> Result<()> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Updating namespace not supported yet!", + )) + } + + /// Drop a namespace from the catalog. + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.namespace_endpoint(namespace)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// List tables from namespace. + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> { + let request = self + .client + .0 + .get(self.config.tables_endpoint(namespace)) + .build()?; + + let resp = self + .client + .query::<ListTableResponse, ErrorModel, OK>(request) + .await?; + + Ok(resp.identifiers) + } + + /// Create a new table inside the namespace. + async fn create_table( + &self, + _namespace: &NamespaceIdent, + _creation: TableCreation, + ) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Load table from the catalog. + async fn load_table(&self, _table: &TableIdent) -> Result<Table> { + Err(Error::new( + ErrorKind::FeatureUnsupported, + "Creating table not supported yet!", + )) + } + + /// Drop a table from the catalog. + async fn drop_table(&self, table: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .delete(self.config.table_endpoint(table)) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Check if a table exists in the catalog. + async fn stat_table(&self, table: &TableIdent) -> Result<bool> { + let request = self + .client + .0 + .head(self.config.table_endpoint(table)) + .build()?; + + self.client + .execute::<ErrorModel, NO_CONTENT>(request) + .await + .map(|_| true) + } + + /// Rename a table in the catalog. + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + let request = self + .client + .0 + .post(self.config.rename_table_endpoint()) + .json(&RenameTableRequest { + source: src.clone(), + destination: dest.clone(), + }) + .build()?; + + self.client.execute::<ErrorModel, NO_CONTENT>(request).await + } + + /// Update a table to the catalog. + async fn update_table(&self, _table: &TableIdent, _commit: TableCommit) -> Result<Table> { + todo!() + } + + /// Update multiple tables to the catalog as an atomic operation. + async fn update_tables(&self, _tables: &[(TableIdent, TableCommit)]) -> Result<()> { + todo!() + } Review Comment: As I have said in pr description, the goal of this pr is to implement simple apis. Others will be left in following pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org