snazy commented on code in PR #8857:
URL: https://github.com/apache/iceberg/pull/8857#discussion_r1403377308

##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,206 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Map<ContentKey, Content> contentMap =

Review Comment:
   Nit: superfluous variable (inline the map into the line below) 



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,206 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+      try {
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict = NessieUtil.extractSingleConflict(e);
+        if (conflict.isPresent()) {
+          switch (conflict.get().conflictType()) {
+            case KEY_EXISTS:
+              Content conflicting = 
withReference(api.getContent()).key(key).get().get(key);
+              throw namespaceAlreadyExists(key, conflicting, e);
+            case NAMESPACE_ABSENT:
+              throw new NoSuchNamespaceException(
+                  e,
+                  "Cannot create Namespace '%s': parent namespace '%s' does 
not exist",
+                  namespace,
+                  conflict.get().key());
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+      org.projectnessie.model.Namespace root =
+          org.projectnessie.model.Namespace.of(namespace.levels());
+      String filter =
+          "entry.contentType == 'NAMESPACE' &&"
+              + (namespace.isEmpty()
+                  ? "size(entry.keyElements) == 1"
+                  : String.format(
+                      "size(entry.keyElements) == %d && 
entry.encodedKey.startsWith('%s.')",
+                      root.getElementCount() + 1, root.name()));
+      List<ContentKey> entries =
+          withReference(api.getEntries()).filter(filter).stream()
+              .map(EntriesResponse.Entry::getName)
+              .collect(Collectors.toList());
+      if (entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+      GetContentBuilder getContent = withReference(api.getContent());
+      entries.forEach(getContent::key);
+      return getContent.get().values().stream()
+          .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
           .collect(Collectors.toList());
-    } catch (NessieReferenceNotFoundException e) {
+    } catch (NessieNotFoundException e) {
+      if (namespace.isEmpty()) {
+        throw new NoSuchNamespaceException(
+            e,
+            "Cannot list top-level Namespaces: ref '%s' is no longer valid.",
+            getRef().getName());
+      }
       throw new RuntimeException(
           String.format(
-              "Cannot list Namespaces starting from '%s': " + "ref '%s' is no 
longer valid.",
+              "Cannot list child Namespaces from '%s': ref '%s' is no longer 
valid.",
               namespace, getRef().getName()),
           e);
     }
   }
 
   public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .deleteNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .delete();
-      refresh();
-      return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      return false;
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null && !unwrapNamespace(existing).isPresent()) {
+        throw new NoSuchNamespaceException(
+            "Content object with name '%s' is not a Namespace.", namespace);

Review Comment:
   nit `namespace` (lower-case)



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,206 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+      try {
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict = NessieUtil.extractSingleConflict(e);
+        if (conflict.isPresent()) {
+          switch (conflict.get().conflictType()) {
+            case KEY_EXISTS:
+              Content conflicting = 
withReference(api.getContent()).key(key).get().get(key);
+              throw namespaceAlreadyExists(key, conflicting, e);
+            case NAMESPACE_ABSENT:
+              throw new NoSuchNamespaceException(
+                  e,
+                  "Cannot create Namespace '%s': parent namespace '%s' does 
not exist",
+                  namespace,
+                  conflict.get().key());
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+      org.projectnessie.model.Namespace root =
+          org.projectnessie.model.Namespace.of(namespace.levels());
+      String filter =
+          "entry.contentType == 'NAMESPACE' &&"
+              + (namespace.isEmpty()
+                  ? "size(entry.keyElements) == 1"
+                  : String.format(
+                      "size(entry.keyElements) == %d && 
entry.encodedKey.startsWith('%s.')",
+                      root.getElementCount() + 1, root.name()));
+      List<ContentKey> entries =
+          withReference(api.getEntries()).filter(filter).stream()
+              .map(EntriesResponse.Entry::getName)
+              .collect(Collectors.toList());
+      if (entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+      GetContentBuilder getContent = withReference(api.getContent());
+      entries.forEach(getContent::key);
+      return getContent.get().values().stream()
+          .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
           .collect(Collectors.toList());
-    } catch (NessieReferenceNotFoundException e) {
+    } catch (NessieNotFoundException e) {
+      if (namespace.isEmpty()) {
+        throw new NoSuchNamespaceException(
+            e,
+            "Cannot list top-level Namespaces: ref '%s' is no longer valid.",
+            getRef().getName());
+      }
       throw new RuntimeException(

Review Comment:
   Should this also be a `NoSuchNamespaceException`?



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieUtil.java:
##########
@@ -165,4 +168,21 @@ public static TableMetadata 
updateTableMetadataWithNessieSpecificProperties(
 
     return builder.discardChanges().build();
   }
+
+  public static Optional<Conflict> 
extractSingleConflict(NessieReferenceConflictException ex) {

Review Comment:
   Optional:
   I _think_ it is currently fine to check for only one conflict, because the 
server _currently_ almost always returns only a single conflict.
   However, I'm not sure whether that will always be the case in the future.
   Could you add a parameter for the handled conflict-types 
(`Collection<Conflict.ConflictType>`.) and return the first matching one? 



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNamespace.java:
##########
@@ -106,7 +106,7 @@ public void testCreatingAndDroppingNamespaceWithContent() 
throws NessieNotFoundE
 
     Assertions.assertThatThrownBy(() -> catalog.dropNamespace(namespace))
         .isInstanceOf(NamespaceNotEmptyException.class)
-        .hasMessage("Namespace 'test' is not empty. One or more tables 
exist.");
+        .hasMessageContaining("Namespace 'test' is not empty");

Review Comment:
   right



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java:
##########
@@ -91,6 +112,412 @@ public void testWithReferenceAfterRecreatingBranch()
     Assertions.assertThat(client.withReference(branch, 
null)).isNotEqualTo(client);
   }
 
+  @Test
+  public void testCreateNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+    
Assertions.assertThat(client.listNamespaces(Namespace.of("a"))).isNotNull();
+
+    List<LogResponse.LogEntry> entries = 
api.getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .extracting(LogResponse.LogEntry::getCommitMeta)
+        .satisfies(
+            meta -> {
+              Assertions.assertThat(meta.getMessage()).contains("create 
namespace a");
+              
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(meta.getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+  }
+
+  @Test
+  public void testCreateNamespaceInvalid() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceInvalidBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.empty(), Map.of()))
+        .isInstanceOf(IllegalArgumentException.class)
+        .hasMessageContaining("Creating empty namespaces is not supported");
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.of("a", "b"), Map.of()))
+        .isInstanceOf(NoSuchNamespaceException.class)
+        .hasMessageContaining("Cannot create Namespace 'a.b': parent namespace 
'a' does not exist");
+  }
+

Review Comment:
   Nice bunch of additional test cases!



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,239 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+
+      try {
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+      } catch (NessieReferenceConflictException e) {
+        NessieConflictHandler.handleSingle(
+            e,
+            (conflictType, contentKey) -> {
+              switch (conflictType) {
+                case KEY_EXISTS:
+                  Content conflicting =
+                      
withReference(api.getContent()).key(contentKey).get().get(contentKey);
+                  throw namespaceAlreadyExists(contentKey, conflicting, e);
+                case NAMESPACE_ABSENT:
+                  throw new NoSuchNamespaceException(
+                      e,
+                      "Cannot create Namespace '%s': parent namespace '%s' 
does not exist",
+                      namespace,
+                      contentKey);
+                default:
+                  return false;
+              }
+            });
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
+
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
+
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+      org.projectnessie.model.Namespace root =
+          org.projectnessie.model.Namespace.of(namespace.levels());
+
+      String filter =
+          namespace.isEmpty()
+              ? "size(entry.keyElements) == 1"
+              : String.format(
+                  "size(entry.keyElements) == %d && 
entry.encodedKey.startsWith('%s.')",
+                  root.getElementCount() + 1, root.name());
+
+      List<ContentKey> entries =
+          withReference(api.getEntries()).filter(filter).stream()
+              .filter(e -> Content.Type.NAMESPACE.equals(e.getType()))
+              .map(EntriesResponse.Entry::getName)
+              .collect(Collectors.toList());
+
+      if (entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+
+      GetContentBuilder getContent = withReference(api.getContent());
+      entries.forEach(getContent::key);
+
+      return getContent.get().values().stream()
+          .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
+          .filter(v -> v.length() == namespace.length() + 1) // only direct 
children
           .collect(Collectors.toList());
-    } catch (NessieReferenceNotFoundException e) {
+
+    } catch (NessieNotFoundException e) {
+      if (namespace.isEmpty()) {
+        throw new NoSuchNamespaceException(
+            e,
+            "Cannot list top-level Namespaces: ref '%s' is no longer valid.",
+            getRef().getName());
+      }
       throw new RuntimeException(
           String.format(
-              "Cannot list Namespaces starting from '%s': " + "ref '%s' is no 
longer valid.",
+              "Cannot list child Namespaces from '%s': ref '%s' is no longer 
valid.",
               namespace, getRef().getName()),
           e);
     }
   }
 
   public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
+
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .deleteNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .delete();
-      refresh();
-      return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      return false;
+      Map<ContentKey, Content> contentMap =

Review Comment:
   Nice catch!



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestMultipleClients.java:
##########
@@ -78,10 +81,19 @@ public void testListNamespaces() {
 
     Assertions.assertThat(catalog.listNamespaces())
         .containsExactlyInAnyOrder(Namespace.of("db1"), Namespace.of("db2"));
+
+    api.deleteBranch().branch((Branch) 
api.getReference().refName(branch).get()).delete();
+
+    Assertions.assertThatThrownBy(() -> catalog.listNamespaces())

Review Comment:
   Can you add a test to validate the behavior when a namespace has been 
deleted (or against a non-existing namespace)? Same for "load namespace 
metadata" below - not sure whether that's been tackled in other test.



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java:
##########
@@ -91,6 +112,412 @@ public void testWithReferenceAfterRecreatingBranch()
     Assertions.assertThat(client.withReference(branch, 
null)).isNotEqualTo(client);
   }
 
+  @Test
+  public void testCreateNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+    
Assertions.assertThat(client.listNamespaces(Namespace.of("a"))).isNotNull();
+
+    List<LogResponse.LogEntry> entries = 
api.getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .extracting(LogResponse.LogEntry::getCommitMeta)
+        .satisfies(
+            meta -> {
+              Assertions.assertThat(meta.getMessage()).contains("create 
namespace a");
+              
Assertions.assertThat(meta.getAuthor()).isEqualTo("iceberg-user");
+              Assertions.assertThat(meta.getProperties())
+                  .containsEntry(NessieUtil.APPLICATION_TYPE, "iceberg")
+                  .containsEntry(CatalogProperties.APP_ID, "iceberg-nessie");
+            });
+  }
+
+  @Test
+  public void testCreateNamespaceInvalid() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceInvalidBranch";
+    createBranch(branch);
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
Map.of());
+
+    Assertions.assertThatThrownBy(() -> 
client.createNamespace(Namespace.empty(), Map.of()))
+        .isInstanceOf(IllegalArgumentException.class)

Review Comment:
   Nit: `assertThatIllegalArgumentException`



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,206 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+      try {
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict = NessieUtil.extractSingleConflict(e);
+        if (conflict.isPresent()) {
+          switch (conflict.get().conflictType()) {
+            case KEY_EXISTS:
+              Content conflicting = 
withReference(api.getContent()).key(key).get().get(key);
+              throw namespaceAlreadyExists(key, conflicting, e);
+            case NAMESPACE_ABSENT:
+              throw new NoSuchNamespaceException(
+                  e,
+                  "Cannot create Namespace '%s': parent namespace '%s' does 
not exist",
+                  namespace,
+                  conflict.get().key());
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+      org.projectnessie.model.Namespace root =
+          org.projectnessie.model.Namespace.of(namespace.levels());
+      String filter =
+          "entry.contentType == 'NAMESPACE' &&"

Review Comment:
   I suspect that change can be done in the Nessie Client* class as well 
(Nessie PR)



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,206 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+      try {
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict = NessieUtil.extractSingleConflict(e);
+        if (conflict.isPresent()) {
+          switch (conflict.get().conflictType()) {
+            case KEY_EXISTS:
+              Content conflicting = 
withReference(api.getContent()).key(key).get().get(key);
+              throw namespaceAlreadyExists(key, conflicting, e);
+            case NAMESPACE_ABSENT:
+              throw new NoSuchNamespaceException(
+                  e,
+                  "Cannot create Namespace '%s': parent namespace '%s' does 
not exist",
+                  namespace,
+                  conflict.get().key());
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+      org.projectnessie.model.Namespace root =
+          org.projectnessie.model.Namespace.of(namespace.levels());
+      String filter =
+          "entry.contentType == 'NAMESPACE' &&"
+              + (namespace.isEmpty()
+                  ? "size(entry.keyElements) == 1"
+                  : String.format(
+                      "size(entry.keyElements) == %d && 
entry.encodedKey.startsWith('%s.')",
+                      root.getElementCount() + 1, root.name()));
+      List<ContentKey> entries =
+          withReference(api.getEntries()).filter(filter).stream()
+              .map(EntriesResponse.Entry::getName)
+              .collect(Collectors.toList());
+      if (entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+      GetContentBuilder getContent = withReference(api.getContent());
+      entries.forEach(getContent::key);
+      return getContent.get().values().stream()
+          .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
           .collect(Collectors.toList());
-    } catch (NessieReferenceNotFoundException e) {
+    } catch (NessieNotFoundException e) {
+      if (namespace.isEmpty()) {
+        throw new NoSuchNamespaceException(
+            e,
+            "Cannot list top-level Namespaces: ref '%s' is no longer valid.",
+            getRef().getName());
+      }
       throw new RuntimeException(
           String.format(
-              "Cannot list Namespaces starting from '%s': " + "ref '%s' is no 
longer valid.",
+              "Cannot list child Namespaces from '%s': ref '%s' is no longer 
valid.",
               namespace, getRef().getName()),
           e);
     }
   }
 
   public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .deleteNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .delete();
-      refresh();
-      return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      return false;
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null && !unwrapNamespace(existing).isPresent()) {

Review Comment:
   Could you replace this with 
   ```java
         if (existing != null && 
!existing.getType().equals(Content.Type.NAMESPACE)) {
   ```
   I think that's a bit clearer



##########
nessie/src/main/java/org/apache/iceberg/nessie/NessieIcebergClient.java:
##########
@@ -181,133 +186,206 @@ public IcebergTable table(TableIdentifier 
tableIdentifier) {
   }
 
   public void createNamespace(Namespace namespace, Map<String, String> 
metadata) {
+    getRef().checkMutable();
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("Creating empty namespaces is not 
supported");
+    }
+    ContentKey key = ContentKey.of(namespace.levels());
+    org.projectnessie.model.Namespace content =
+        org.projectnessie.model.Namespace.of(key.getElements(), metadata);
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .createNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .properties(metadata))
-          .create();
-      refresh();
-    } catch (NessieNamespaceAlreadyExistsException e) {
-      throw new AlreadyExistsException(e, "Namespace already exists: %s", 
namespace);
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null) {
+        throw namespaceAlreadyExists(key, existing, null);
+      }
+      try {
+        commitRetry("create namespace " + key, Operation.Put.of(key, content));
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict = NessieUtil.extractSingleConflict(e);
+        if (conflict.isPresent()) {
+          switch (conflict.get().conflictType()) {
+            case KEY_EXISTS:
+              Content conflicting = 
withReference(api.getContent()).key(key).get().get(key);
+              throw namespaceAlreadyExists(key, conflicting, e);
+            case NAMESPACE_ABSENT:
+              throw new NoSuchNamespaceException(
+                  e,
+                  "Cannot create Namespace '%s': parent namespace '%s' does 
not exist",
+                  namespace,
+                  conflict.get().key());
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot create Namespace '%s': " + "ref '%s' is no longer 
valid.",
+              "Cannot create Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot create Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
   }
 
   public List<Namespace> listNamespaces(Namespace namespace) throws 
NoSuchNamespaceException {
     try {
-      GetNamespacesResponse response =
-          withReference(
-                  getApi()
-                      .getMultipleNamespaces()
-                      
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-              .get();
-      return response.getNamespaces().stream()
-          .map(ns -> Namespace.of(ns.getElements().toArray(new String[0])))
-          .filter(ns -> ns.length() == namespace.length() + 1)
+      org.projectnessie.model.Namespace root =
+          org.projectnessie.model.Namespace.of(namespace.levels());
+      String filter =
+          "entry.contentType == 'NAMESPACE' &&"
+              + (namespace.isEmpty()
+                  ? "size(entry.keyElements) == 1"
+                  : String.format(
+                      "size(entry.keyElements) == %d && 
entry.encodedKey.startsWith('%s.')",
+                      root.getElementCount() + 1, root.name()));
+      List<ContentKey> entries =
+          withReference(api.getEntries()).filter(filter).stream()
+              .map(EntriesResponse.Entry::getName)
+              .collect(Collectors.toList());
+      if (entries.isEmpty()) {
+        return Collections.emptyList();
+      }
+      GetContentBuilder getContent = withReference(api.getContent());
+      entries.forEach(getContent::key);
+      return getContent.get().values().stream()
+          .map(v -> v.unwrap(org.projectnessie.model.Namespace.class))
+          .filter(Optional::isPresent)
+          .map(Optional::get)
+          .map(v -> Namespace.of(v.getElements().toArray(new String[0])))
           .collect(Collectors.toList());
-    } catch (NessieReferenceNotFoundException e) {
+    } catch (NessieNotFoundException e) {
+      if (namespace.isEmpty()) {
+        throw new NoSuchNamespaceException(
+            e,
+            "Cannot list top-level Namespaces: ref '%s' is no longer valid.",
+            getRef().getName());
+      }
       throw new RuntimeException(
           String.format(
-              "Cannot list Namespaces starting from '%s': " + "ref '%s' is no 
longer valid.",
+              "Cannot list child Namespaces from '%s': ref '%s' is no longer 
valid.",
               namespace, getRef().getName()),
           e);
     }
   }
 
   public boolean dropNamespace(Namespace namespace) throws 
NamespaceNotEmptyException {
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      getRef().checkMutable();
-      withReference(
-              getApi()
-                  .deleteNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .delete();
-      refresh();
-      return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      return false;
+      Map<ContentKey, Content> contentMap =
+          api.getContent().reference(getReference()).key(key).get();
+      Content existing = contentMap.get(key);
+      if (existing != null && !unwrapNamespace(existing).isPresent()) {
+        throw new NoSuchNamespaceException(
+            "Content object with name '%s' is not a Namespace.", namespace);
+      }
+      try {
+        commitRetry("drop namespace " + key, Operation.Delete.of(key));
+        return true;
+      } catch (NessieReferenceConflictException e) {
+        Optional<Conflict> conflict = NessieUtil.extractSingleConflict(e);
+        if (conflict.isPresent()) {
+          Conflict.ConflictType conflictType = conflict.get().conflictType();
+          switch (conflictType) {
+            case KEY_DOES_NOT_EXIST:
+              return false;
+            case NAMESPACE_NOT_EMPTY:
+              throw new NamespaceNotEmptyException(e, "Namespace '%s' is not 
empty.", namespace);
+          }
+        }
+        throw new RuntimeException(
+            String.format("Cannot drop Namespace '%s': %s", namespace, 
e.getMessage()));
+      }
     } catch (NessieNotFoundException e) {
       LOG.error(
           "Cannot drop Namespace '{}': ref '{}' is no longer valid.",
           namespace,
           getRef().getName(),
           e);
-      return false;
-    } catch (NessieNamespaceNotEmptyException e) {
-      throw new NamespaceNotEmptyException(
-          e, "Namespace '%s' is not empty. One or more tables exist.", 
namespace);
+    } catch (BaseNessieClientServerException e) {
+      throw new RuntimeException(
+          String.format("Cannot drop Namespace '%s': %s", namespace, 
e.getMessage()), e);
     }
+    return false;
   }
 
   public Map<String, String> loadNamespaceMetadata(Namespace namespace)
       throws NoSuchNamespaceException {
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      return withReference(
-              getApi()
-                  .getNamespace()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels())))
-          .get()
+      Map<ContentKey, Content> contentMap = 
withReference(api.getContent()).key(key).get();
+      return unwrapNamespace(contentMap.get(key))
+          .orElseThrow(
+              () -> new NoSuchNamespaceException("Namespace does not exist: 
%s", namespace))
           .getProperties();
-    } catch (NessieNamespaceNotFoundException e) {
-      throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
-    } catch (NessieReferenceNotFoundException e) {
+    } catch (NessieNotFoundException e) {
       throw new RuntimeException(
           String.format(
-              "Cannot load Namespace '%s': " + "ref '%s' is no longer valid.",
+              "Cannot load Namespace '%s': ref '%s' is no longer valid.",
               namespace, getRef().getName()),
           e);
     }
   }
 
   public boolean setProperties(Namespace namespace, Map<String, String> 
properties) {
-    try {
-      withReference(
-              getApi()
-                  .updateProperties()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .updateProperties(properties))
-          .update();
-      refresh();
-      // always successful, otherwise an exception is thrown
-      return true;
-    } catch (NessieNamespaceNotFoundException e) {
-      throw new NoSuchNamespaceException(e, "Namespace does not exist: %s", 
namespace);
-    } catch (NessieNotFoundException e) {
-      throw new RuntimeException(
-          String.format(
-              "Cannot update properties on Namespace '%s': ref '%s' is no 
longer valid.",
-              namespace, getRef().getName()),
-          e);
-    }
+    return updateProperties(namespace, props -> props.putAll(properties));
   }
 
   public boolean removeProperties(Namespace namespace, Set<String> properties) 
{
+    return updateProperties(namespace, props -> 
props.keySet().removeAll(properties));
+  }
+
+  private boolean updateProperties(Namespace namespace, Consumer<Map<String, 
String>> action) {
+    getRef().checkMutable();
+    ContentKey key = ContentKey.of(namespace.levels());
     try {
-      withReference(
-              getApi()
-                  .updateProperties()
-                  
.namespace(org.projectnessie.model.Namespace.of(namespace.levels()))
-                  .removeProperties(properties))
-          .update();
-      refresh();
+      commitRetry(
+          "update namespace " + key,
+          true,
+          commitBuilder -> {
+            Map<ContentKey, Content> contentMap =

Review Comment:
   Nit: inline to 
   ```java
   Content namespace =  
api.getContent().reference(getReference()).key(key).get().get(key);
   ```



##########
nessie/src/test/java/org/apache/iceberg/nessie/TestNessieIcebergClient.java:
##########
@@ -91,6 +112,412 @@ public void testWithReferenceAfterRecreatingBranch()
     Assertions.assertThat(client.withReference(branch, 
null)).isNotEqualTo(client);
   }
 
+  @Test
+  public void testCreateNamespace() throws NessieConflictException, 
NessieNotFoundException {
+    String branch = "createNamespaceBranch";
+    createBranch(branch);
+    Map<String, String> catalogOptions =
+        Map.of(
+            CatalogProperties.USER, "iceberg-user",
+            CatalogProperties.APP_ID, "iceberg-nessie");
+    NessieIcebergClient client = new NessieIcebergClient(api, branch, null, 
catalogOptions);
+
+    client.createNamespace(Namespace.of("a"), Map.of());
+    
Assertions.assertThat(client.listNamespaces(Namespace.of("a"))).isNotNull();
+
+    List<LogResponse.LogEntry> entries = 
api.getCommitLog().refName(branch).get().getLogEntries();
+    Assertions.assertThat(entries)
+        .isNotEmpty()
+        .first()
+        .extracting(LogResponse.LogEntry::getCommitMeta)
+        .satisfies(

Review Comment:
   Nit: `extracting()` and `.containsExactly()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org


Reply via email to