liurenjie1024 commented on code in PR #610: URL: https://github.com/apache/iceberg-rust/pull/610#discussion_r1779838127
########## crates/catalog/sql/src/catalog.rs: ########## @@ -472,36 +479,287 @@ impl Catalog for SqlCatalog { } } - async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { - todo!() + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let exists = self.namespace_exists(namespace).await?; + if exists { + // if there are tables in the namespace, don't allow drop. + let tables = self.list_tables(namespace).await?; + if !tables.is_empty() { + return Err(Error::new( + iceberg::ErrorKind::Unexpected, + format!( + "Namespace {:?} is not empty. {} tables exist.", + namespace, + tables.len() + ), + )); + } + + self.execute( + &format!("DELETE FROM {NAMESPACE_TABLE_NAME} WHERE {NAMESPACE_FIELD_NAME} = ?"), Review Comment: We should also include catalog name in filter. ########## crates/catalog/sql/src/catalog.rs: ########## @@ -472,36 +479,287 @@ impl Catalog for SqlCatalog { } } - async fn drop_namespace(&self, _namespace: &NamespaceIdent) -> Result<()> { - todo!() + async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> { + let exists = self.namespace_exists(namespace).await?; + if exists { + // if there are tables in the namespace, don't allow drop. + let tables = self.list_tables(namespace).await?; + if !tables.is_empty() { + return Err(Error::new( + iceberg::ErrorKind::Unexpected, + format!( + "Namespace {:?} is not empty. {} tables exist.", + namespace, + tables.len() + ), + )); + } + + self.execute( + &format!("DELETE FROM {NAMESPACE_TABLE_NAME} WHERE {NAMESPACE_FIELD_NAME} = ?"), + vec![Some(&namespace.join("."))], + None, + ) + .await?; + + Ok(()) + } else { + no_such_namespace_err(namespace) + } } - async fn list_tables(&self, _namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> { - todo!() + async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> { + let exists = self.namespace_exists(namespace).await?; + if exists { + let rows = self + .fetch_rows( + &format!( + "SELECT {CATALOG_FIELD_TABLE_NAME}, + {CATALOG_FIELD_TABLE_NAMESPACE} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )", + ), + vec![Some(&namespace.join(".")), Some(&self.name)], + ) + .await?; + + let mut tables = HashSet::<TableIdent>::with_capacity(rows.len()); + + for row in rows.iter() { + let tbl = row + .try_get::<String, _>(CATALOG_FIELD_TABLE_NAME) + .map_err(from_sqlx_error)?; + let ns_strs = row + .try_get::<String, _>(CATALOG_FIELD_TABLE_NAMESPACE) + .map_err(from_sqlx_error)?; + let ns = NamespaceIdent::from_strs(ns_strs.split("."))?; + tables.insert(TableIdent::new(ns, tbl)); + } + + Ok(tables.into_iter().collect::<Vec<TableIdent>>()) + } else { + no_such_namespace_err(namespace) + } } - async fn table_exists(&self, _identifier: &TableIdent) -> Result<bool> { - todo!() + async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> { + let namespace = identifier.namespace().join("."); + let table_name = identifier.name(); + let table_counts = self + .fetch_rows( + &format!( + "SELECT 1 + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![Some(&namespace), Some(&self.name), Some(table_name)], + ) + .await?; + + if !table_counts.is_empty() { + Ok(true) + } else { + Ok(false) + } } - async fn drop_table(&self, _identifier: &TableIdent) -> Result<()> { - todo!() + async fn drop_table(&self, identifier: &TableIdent) -> Result<()> { + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + self.execute( + &format!( + "DELETE FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(&self.name), + Some(identifier.name()), + Some(&identifier.namespace().join(".")), + ], + None, + ) + .await?; + + Ok(()) } - async fn load_table(&self, _identifier: &TableIdent) -> Result<Table> { - todo!() + async fn load_table(&self, identifier: &TableIdent) -> Result<Table> { + if !self.table_exists(identifier).await? { + return no_such_table_err(identifier); + } + + let rows = self + .fetch_rows( + &format!( + "SELECT {CATALOG_FIELD_METADATA_LOCATION_PROP} + FROM {CATALOG_TABLE_NAME} + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(&self.name), + Some(identifier.name()), + Some(&identifier.namespace().join(".")), + ], + ) + .await?; + + if rows.is_empty() { + return no_such_table_err(identifier); + } + + let row = &rows[0]; + let tbl_metadata_location = row + .try_get::<String, _>(CATALOG_FIELD_METADATA_LOCATION_PROP) + .map_err(from_sqlx_error)?; + + let file = self.fileio.new_input(&tbl_metadata_location)?; + let metadata_content = file.read().await?; + let metadata = serde_json::from_slice::<TableMetadata>(&metadata_content)?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .identifier(identifier.clone()) + .metadata_location(tbl_metadata_location) + .metadata(metadata) + .build()?) } async fn create_table( &self, - _namespace: &NamespaceIdent, - _creation: TableCreation, + namespace: &NamespaceIdent, + creation: TableCreation, ) -> Result<Table> { - todo!() + if !self.namespace_exists(namespace).await? { + return no_such_namespace_err(namespace); + } + + let tbl_name = creation.name.clone(); + let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone()); + + if self.table_exists(&tbl_ident).await? { + return table_already_exists_err(&tbl_ident); + } + + let (tbl_creation, location) = match creation.location.clone() { + Some(location) => (creation, location), + None => { + // fall back to namespace-specific location + // and then to warehouse location + let nsp_properties = self.get_namespace(namespace).await?.properties().clone(); + let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) { + Some(location) => location.clone(), + None => { + format!( + "{}/{}", + self.warehouse_location.clone(), + namespace.join("/") + ) + } + }; + + let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name()); + + ( + TableCreation { + location: Some(tbl_location.clone()), + ..creation + }, + tbl_location, + ) + } + }; + + let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?.build()?; + let tbl_metadata_location = format!( + "{}/metadata/0-{}.metadata.json", + location.clone(), + Uuid::new_v4() + ); + + let file = self.fileio.new_output(&tbl_metadata_location)?; + file.write(serde_json::to_vec(&tbl_metadata)?.into()) + .await?; + + self.execute(&format!( + "INSERT INTO {CATALOG_TABLE_NAME} + ({CATALOG_FIELD_CATALOG_NAME}, {CATALOG_FIELD_TABLE_NAMESPACE}, {CATALOG_FIELD_TABLE_NAME}, {CATALOG_FIELD_METADATA_LOCATION_PROP}, {CATALOG_FIELD_RECORD_TYPE}) + VALUES (?, ?, ?, ?, ?) + "), vec![Some(&self.name), Some(&namespace.join(".")), Some(&tbl_name.clone()), Some(&tbl_metadata_location), Some(CATALOG_FIELD_TABLE_RECORD_TYPE)], None).await?; + + Ok(Table::builder() + .file_io(self.fileio.clone()) + .metadata_location(tbl_metadata_location) + .identifier(tbl_ident) + .metadata(tbl_metadata) + .build()?) } - async fn rename_table(&self, _src: &TableIdent, _dest: &TableIdent) -> Result<()> { - todo!() + async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> { + if src == dest { + return Ok(()); + } + + if !self.table_exists(src).await? { + return no_such_table_err(src); + } + + if !self.namespace_exists(dest.namespace()).await? { + return no_such_namespace_err(dest.namespace()); + } + + if self.table_exists(dest).await? { + return table_already_exists_err(dest); + } + + self.execute( + &format!( + "UPDATE {CATALOG_TABLE_NAME} + SET {CATALOG_FIELD_TABLE_NAME} = ?, {CATALOG_FIELD_TABLE_NAMESPACE} = ? + WHERE {CATALOG_FIELD_CATALOG_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAME} = ? + AND {CATALOG_FIELD_TABLE_NAMESPACE} = ? + AND ( + {CATALOG_FIELD_RECORD_TYPE} = '{CATALOG_FIELD_TABLE_RECORD_TYPE}' + OR {CATALOG_FIELD_RECORD_TYPE} IS NULL + )" + ), + vec![ + Some(dest.name()), + Some(&dest.namespace().join(".")), + Some(&self.name), + Some(src.name()), + Some(&src.namespace().join(".")), + ], + None, + ) + .await?; + + Ok(()) } async fn update_table(&self, _commit: TableCommit) -> Result<Table> { Review Comment: No releated to this pr, but should we throw NotImplement error rather panic? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org