nastra commented on code in PR #8857:
URL: https://github.com/apache/iceberg/pull/8857#discussion_r1385038063


##########
nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java:
##########
@@ -78,10 +81,19 @@ public void testListNamespaces() {
 
     Assertions.assertThat(catalog.listNamespaces())
         .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThatThrownBy(() -> catalog.listNamespaces())
+        .hasMessageContaining(
+            "Cannot list Namespaces starting from '': ref '%s' is no longer 
valid", branch);

Review Comment:
   minor: having `''` looks a bit weird in the error message.



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,230 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+
+      try {
+

Review Comment:
   newlines around `commitRetry` can be removed



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java:
##########
@@ -91,6 +109,361 @@ public void testWithReferenceAfterRecreatingBranch()
     Assertions.assertThat(client.withReference(branch, 
null)).isNotEqualTo(client);
   }
 
+  @Test
+  public void testCreateNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.empty(), Map.of()))
+        .hasMessageContaining("Creating empty namespaces is not supported");
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "b"), Map.of()))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Cannot create Namespace 'a.b': parent namespace 
'a' does not exist");
+
+    Namespace ns = Namespace.of("a");
+    client.createNamespace(ns, Map.of());
+    Assertions.assertThat(client.listNamespaces(ns)).isNotNull();
+
+    List<LogResponse.LogEntry> entries =
+        client.getApi().getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .satisfies(
+            entry -> {
+              Assertions.assertThat(entry.getCommitMeta().getMessage())
+                  .contains("create namespace a");
+              
Assertions.assertThat(entry.getCommitMeta().getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(entry.getCommitMeta().getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+
+    // test cases where a conflicting key is added by this client
+
+    Assertions.assertThatThrownBy(() -> client.createNamespace(ns, Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Namespace already exists: 'a'");
+
+    Schema schema =
+        new Schema(Types.StructType.of(required(1, "id", 
Types.LongType.get())).fields());
+    TableMetadata table1 =
+        TableMetadata.newTableMetadata(
+            schema, PartitionSpec.unpartitioned(), SortOrder.unsorted(), null, 
Map.of());
+    client.commitTable(
+        null, table1, "file:///tmp/iceberg", (String) null, ContentKey.of("a", 
"tbl"));
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "tbl"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Another content object with name 'a.tbl' 
already exists");
+
+    // test cases where a conflicting key is added by another client
+
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("create namespace b", 
catalogOptions))
+        .operation(
+            Operation.Put.of(
+                ContentKey.of("b"), 
org.projectnessie.model.Namespace.of(ContentKey.of("b"))))
+        .commit();
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("b"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Namespace already exists: 'b'");
+
+    IcebergTable table2 = IcebergTable.of("file:///tmp/iceberg", 1, 1, 1, 1);
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("create table a.tbl2", 
catalogOptions))
+        .operation(Operation.Put.of(ContentKey.of("a", "tbl2"), table2))
+        .commit();
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "tbl2"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Another content object with name 'a.tbl2' 
already exists");
+
+    client
+        .getApi()
+        .deleteBranch()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .delete();
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("c"), Map.of()))
+        .hasMessageContaining(
+            "Cannot create Namespace 'c': ref 'createNamespaceBranch' is no 
longer valid");
+  }
+
+  @Test
+  public void testDropNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "dropNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Namespace parent = Namespace.of("a");
+    Namespace child = Namespace.of("a", "b");
+
+    Assertions.assertThat(client.dropNamespace(parent)).isFalse();
+    Assertions.assertThat(client.dropNamespace(child)).isFalse();
+
+    client.createNamespace(parent, Map.of());
+    client.createNamespace(child, Map.of());
+
+    Assertions.assertThat(client.dropNamespace(child)).isTrue();
+
+    List<LogResponse.LogEntry> entries =
+        client.getApi().getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .satisfies(
+            entry -> {
+              Assertions.assertThat(entry.getCommitMeta().getMessage())
+                  .contains("drop namespace a.b");
+              
Assertions.assertThat(entry.getCommitMeta().getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(entry.getCommitMeta().getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+
+    client.createNamespace(child, Map.of());
+
+    Assertions.assertThatThrownBy(() -> client.dropNamespace(parent))
+        .hasMessageContaining("Namespace 'a' is not empty.");
+
+    IcebergTable table2 = IcebergTable.of("file:///tmp/iceberg", 1, 1, 1, 1);
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("create table a.tbl2", 
catalogOptions))
+        .operation(Operation.Put.of(ContentKey.of("a", "tbl2"), table2))
+        .commit();
+
+    Assertions.assertThatThrownBy(() -> client.dropNamespace(Namespace.of("a", 
"tbl2")))
+        .hasMessageContaining(
+            "Cannot drop Namespace 'a.tbl2': Payload of existing and expected 
content for key 'a.tbl2' are different");
+
+    client
+        .getApi()
+        .deleteBranch()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .delete();
+
+    Assertions.assertThat(client.dropNamespace(child)).isFalse();
+  }
+
+  @Test
+  void testUpdateProperties() throws NessieConflictException, 
NessieNotFoundException {

Review Comment:
   same here, can these be split up into smaller tests?



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java:
##########
@@ -91,6 +109,361 @@ public void testWithReferenceAfterRecreatingBranch()
     Assertions.assertThat(client.withReference(branch, 
null)).isNotEqualTo(client);
   }
 
+  @Test

Review Comment:
   this is a fairly long test method. I would suggest to split it up into 
multiple smaller test methods



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,230 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+
+      try {
+
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+
+      } catch (NessieReferenceConflictException e) {
+

Review Comment:
   newline



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java:
##########
@@ -91,6 +109,361 @@ public void testWithReferenceAfterRecreatingBranch()
     Assertions.assertThat(client.withReference(branch, 
null)).isNotEqualTo(client);
   }
 
+  @Test
+  public void testCreateNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.empty(), Map.of()))
+        .hasMessageContaining("Creating empty namespaces is not supported");
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "b"), Map.of()))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Cannot create Namespace 'a.b': parent namespace 
'a' does not exist");
+
+    Namespace ns = Namespace.of("a");
+    client.createNamespace(ns, Map.of());
+    Assertions.assertThat(client.listNamespaces(ns)).isNotNull();
+
+    List<LogResponse.LogEntry> entries =
+        client.getApi().getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .satisfies(
+            entry -> {
+              Assertions.assertThat(entry.getCommitMeta().getMessage())
+                  .contains("create namespace a");
+              
Assertions.assertThat(entry.getCommitMeta().getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(entry.getCommitMeta().getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+
+    // test cases where a conflicting key is added by this client
+
+    Assertions.assertThatThrownBy(() -> client.createNamespace(ns, Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Namespace already exists: 'a'");
+
+    Schema schema =
+        new Schema(Types.StructType.of(required(1, "id", 
Types.LongType.get())).fields());
+    TableMetadata table1 =
+        TableMetadata.newTableMetadata(
+            schema, PartitionSpec.unpartitioned(), SortOrder.unsorted(), null, 
Map.of());
+    client.commitTable(
+        null, table1, "file:///tmp/iceberg", (String) null, ContentKey.of("a", 
"tbl"));
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "tbl"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Another content object with name 'a.tbl' 
already exists");
+
+    // test cases where a conflicting key is added by another client
+
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("create namespace b", 
catalogOptions))
+        .operation(
+            Operation.Put.of(
+                ContentKey.of("b"), 
org.projectnessie.model.Namespace.of(ContentKey.of("b"))))
+        .commit();
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("b"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Namespace already exists: 'b'");
+
+    IcebergTable table2 = IcebergTable.of("file:///tmp/iceberg", 1, 1, 1, 1);
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("create table a.tbl2", 
catalogOptions))
+        .operation(Operation.Put.of(ContentKey.of("a", "tbl2"), table2))
+        .commit();
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "tbl2"), Map.of()))
+        .isInstanceOf(AlreadyExistsException.class)
+        .hasMessageContaining("Another content object with name 'a.tbl2' 
already exists");
+
+    client
+        .getApi()
+        .deleteBranch()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .delete();
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("c"), Map.of()))
+        .hasMessageContaining(
+            "Cannot create Namespace 'c': ref 'createNamespaceBranch' is no 
longer valid");
+  }
+
+  @Test
+  public void testDropNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "dropNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Namespace parent = Namespace.of("a");
+    Namespace child = Namespace.of("a", "b");
+
+    Assertions.assertThat(client.dropNamespace(parent)).isFalse();
+    Assertions.assertThat(client.dropNamespace(child)).isFalse();
+
+    client.createNamespace(parent, Map.of());
+    client.createNamespace(child, Map.of());
+
+    Assertions.assertThat(client.dropNamespace(child)).isTrue();
+
+    List<LogResponse.LogEntry> entries =
+        client.getApi().getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .satisfies(
+            entry -> {
+              Assertions.assertThat(entry.getCommitMeta().getMessage())
+                  .contains("drop namespace a.b");
+              
Assertions.assertThat(entry.getCommitMeta().getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(entry.getCommitMeta().getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+
+    client.createNamespace(child, Map.of());
+
+    Assertions.assertThatThrownBy(() -> client.dropNamespace(parent))
+        .hasMessageContaining("Namespace 'a' is not empty.");
+
+    IcebergTable table2 = IcebergTable.of("file:///tmp/iceberg", 1, 1, 1, 1);
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("create table a.tbl2", 
catalogOptions))
+        .operation(Operation.Put.of(ContentKey.of("a", "tbl2"), table2))
+        .commit();
+
+    Assertions.assertThatThrownBy(() -> client.dropNamespace(Namespace.of("a", 
"tbl2")))
+        .hasMessageContaining(
+            "Cannot drop Namespace 'a.tbl2': Payload of existing and expected 
content for key 'a.tbl2' are different");
+
+    client
+        .getApi()
+        .deleteBranch()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .delete();
+
+    Assertions.assertThat(client.dropNamespace(child)).isFalse();
+  }
+
+  @Test
+  void testUpdateProperties() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "updatePropertiesBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Namespace ns = Namespace.of("a");
+    ContentKey key = ContentKey.of("a");
+
+    client.createNamespace(ns, Map.of());
+
+    Assertions.assertThat(client.setProperties(ns, Map.of("k1", 
"v1a"))).isTrue();
+    org.projectnessie.model.Namespace content =
+        client
+            .getApi()
+            .getContent()
+            .key(key)
+            .reference(client.getApi().getReference().refName(branch).get())
+            .get()
+            .get(key)
+            .unwrap(org.projectnessie.model.Namespace.class)
+            .orElseThrow();
+    
Assertions.assertThat(content.getProperties()).hasSize(1).containsEntry("k1", 
"v1a");
+
+    List<LogResponse.LogEntry> entries =
+        client.getApi().getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .satisfies(
+            entry -> {
+              Assertions.assertThat(entry.getCommitMeta().getMessage())
+                  .contains("update namespace a");
+              
Assertions.assertThat(entry.getCommitMeta().getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(entry.getCommitMeta().getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+
+    Assertions.assertThat(client.setProperties(ns, Map.of("k1", "v1b", "k2", 
"v2"))).isTrue();
+    content =
+        client
+            .getApi()
+            .getContent()
+            .key(key)
+            .reference(client.getApi().getReference().refName(branch).get())
+            .get()
+            .get(key)
+            .unwrap(org.projectnessie.model.Namespace.class)
+            .orElseThrow();
+    Assertions.assertThat(content.getProperties())
+        .hasSize(2)
+        .containsEntry("k1", "v1b")
+        .containsEntry("k2", "v2");
+
+    Content nsUpdate =
+        org.projectnessie.model.Namespace.builder()
+            .from(content)
+            .properties(Map.of("k1", "v1c", "k2", "v2"))
+            .build();
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("update namespace a", 
catalogOptions))
+        .operation(Operation.Put.of(key, nsUpdate))
+        .commit();
+
+    // will generate a conflict and a retry
+    Assertions.assertThat(client.setProperties(ns, Map.of("k3", 
"v3"))).isTrue();
+
+    content =
+        client
+            .getApi()
+            .getContent()
+            .key(key)
+            .reference(client.getApi().getReference().refName(branch).get())
+            .get()
+            .get(key)
+            .unwrap(org.projectnessie.model.Namespace.class)
+            .orElseThrow();
+    Assertions.assertThat(content.getProperties())
+        .hasSize(3)
+        .containsEntry("k1", "v1c")
+        .containsEntry("k2", "v2")
+        .containsEntry("k3", "v3");
+
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("update namespace a", 
catalogOptions))
+        .operation(Operation.Delete.of(key))
+        .commit();
+
+    Assertions.assertThatThrownBy(() -> client.setProperties(ns, Map.of()))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Namespace does not exist: a");
+
+    client
+        .getApi()
+        .deleteBranch()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .delete();
+
+    Assertions.assertThatThrownBy(() -> 
client.setProperties(Namespace.of("c"), Map.of()))
+        .hasMessageContaining(
+            "Cannot update properties on Namespace 'c': ref 
'updatePropertiesBranch' is no longer valid");
+  }
+
+  @Test
+  void testRemoveProperties() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "removePropertiesBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    Namespace ns = Namespace.of("a");
+    ContentKey key = ContentKey.of("a");
+
+    client.createNamespace(ns, Map.of("k1", "v1", "k2", "v2"));
+
+    Assertions.assertThat(client.removeProperties(ns, Set.of("k1"))).isTrue();
+    org.projectnessie.model.Namespace content =
+        client
+            .getApi()
+            .getContent()
+            .key(key)
+            .reference(client.getApi().getReference().refName(branch).get())
+            .get()
+            .get(key)
+            .unwrap(org.projectnessie.model.Namespace.class)
+            .orElseThrow();
+    
Assertions.assertThat(content.getProperties()).hasSize(1).containsEntry("k2", 
"v2");
+
+    List<LogResponse.LogEntry> entries =
+        client.getApi().getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .satisfies(
+            entry -> {
+              Assertions.assertThat(entry.getCommitMeta().getMessage())
+                  .contains("update namespace a");
+              
Assertions.assertThat(entry.getCommitMeta().getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(entry.getCommitMeta().getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+
+    Content nsUpdate =
+        org.projectnessie.model.Namespace.builder()
+            .from(content)
+            .properties(Map.of("k2", "v2", "k3", "v3"))
+            .build();
+    api.commitMultipleOperations()
+        .branch((Branch) client.getApi().getReference().refName(branch).get())
+        .commitMeta(NessieUtil.buildCommitMetadata("update namespace a", 
catalogOptions))
+        .operation(Operation.Put.of(key, nsUpdate))
+        .commit();
+
+    // will generate a conflict and a retry

Review Comment:
   same as above



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,230 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+

Review Comment:
   newline can be removed



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,230 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+
+      try {
+
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+
+      } catch (NessieReferenceConflictException e) {
+
+        NessieConflictHandler.handleSingle(
+            e,
+            (conflictType, contentKey) -> {
+              switch (conflictType) {
+                case KEY_EXISTS:
+                  Content conflicting =
+                      
withReference(api.getContent()).key(contentKey).get().get(contentKey);
+                  throw namespaceAlreadyExists(contentKey, conflicting, e);
+                case NAMESPACE_ABSENT:
+                  throw new NoSuchNamespaceException(
+                      e,
+                      "Cannot create Namespace '%s': parent namespace '%s' 
does not exist",
+                      namespace,
+                      contentKey);
+                default:
+                  return false;
+              }
+            });
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
+
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+

Review Comment:
   newline



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,230 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+
+      try {
+
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+
+      } catch (NessieReferenceConflictException e) {
+
+        NessieConflictHandler.handleSingle(
+            e,
+            (conflictType, contentKey) -> {
+              switch (conflictType) {
+                case KEY_EXISTS:
+                  Content conflicting =
+                      
withReference(api.getContent()).key(contentKey).get().get(contentKey);
+                  throw namespaceAlreadyExists(contentKey, conflicting, e);
+                case NAMESPACE_ABSENT:
+                  throw new NoSuchNamespaceException(
+                      e,
+                      "Cannot create Namespace '%s': parent namespace '%s' 
does not exist",
+                      namespace,
+                      contentKey);
+                default:
+                  return false;
+              }
+            });
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
+
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+
+      org.projectnessie.model.Namespace root =
+          org.projectnessie.model.Namespace.of(namespace.levels());
+
+      String filter =
+          namespace.isEmpty()
+              ? "size(entry.keyElements) == 1"
+              : String.format(
+                  "size(entry.keyElements) == %d && 
entry.encodedKey.startsWith('%s.')",
+                  root.getElementCount() + 1, root.name());
+
+      List<ContentKey> entries =
+          withReference(api.getEntries()).filter(filter).stream()
+              .filter(e -> Content.Type.NAMESPACE.equals(e.getType()))
+              .map(EntriesResponse.Entry::getName)
+              .collect(Collectors.toList());
+
+      if (entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      GetContentBuilder getContent = withReference(api.getContent());
+      entries.forEach(getContent::key);
+
+      return getContent.get().values().stream()
+          .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
+          .filter(v -> v.length() == namespace.length() + 1) // only direct 
children
           .collect(Collectors.toList());
-    } catch (NessieReferenceNotFoundException e) {
+
+    } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot list Namespaces starting from '%s': " + "ref '%s' is no 
longer valid.",
+              "Cannot list Namespaces starting from '%s': ref '%s' is no 
longer valid.",
               namespace, getRef().getName()),
           e);
     }
   }
 
   public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .deleteNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .delete();
-      refresh();
+
+      commitRetry("drop namespace " + key, Operation.Delete.of(key));
       return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      return false;
+
+    } catch (NessieReferenceConflictException e) {
+

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to